天天看點

Flink之廣播狀态模式

一.提供的API

為了展示所提供的API,我們将以一個示例開始,然後介紹其完整功能。作為正在運作的示例,我們将使用這樣的情況,其中有一系列不同顔色和形狀的對象,并且我們希望找到遵循某種模式的相同顔色的對象對,例如矩形後跟三角形。我們假設這組有趣的模式會随着時間而演變。

在此示例中,第一個流将包含Item帶有Color和Shape屬性的type元素。另一個流将包含Rules。

從流開始Items,我們隻需要鍵入它的Color,因為我們要對相同顔色的。這将確定相同顔色的元素最終出現在同一台實體計算機上。

// key the items by color
KeyedStream<Item, Color> colorPartitionedStream = itemStream
                        .keyBy(new KeySelector<Item, Color>(){...});
           

移至Rules,應将包含它們的流廣播到所有下遊任務,并且這些任務應将它們存儲在本地,以便它們可以針對所有傳入的Items進行評估。下面的代碼段将使用廣播規則流,并且使用提供的MapStateDescriptor,将建立存儲規則的廣播狀态。

// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
			"RulesBroadcastState",
			BasicTypeInfo.STRING_TYPE_INFO,
			TypeInformation.of(new TypeHint<Rule>() {}));
		
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);
           

最後,根據Rules中的Item流中的傳入元素評估,我們需要:

  1. 連接配接兩個流。
  2. 指定比對檢測邏輯。

BroadcastStream可以通過調用connect()來将流(鍵控或非鍵控)與非廣播流(以鍵BroadcastStream為參數)進行連接配接。我們可以調用特殊類型CoProcessFunction的process()方法,這将傳回一個BroadcastConnectedStream。該函數将包含我們的比對邏輯。函數的确切類型取決于非廣播流的類型:

  1. 如果輸入了密碼,則該函數為KeyedBroadcastProcessFunction。
  2. 如果它是非鍵,則該函數為BroadcastProcessFunction。

鑒于我們的非廣播流已加密,以下代碼段包含上述調用:

注意:應該在非廣播流上調用connect,并以BroadcastStream作為參數。
           
DataStream<String> output = colorPartitionedStream
   .connect(ruleBroadcastStream)
    .process(
        
        // type arguments in our KeyedBroadcastProcessFunction represent: 
        //   1. the key of the keyed stream
        //   2. the type of elements in the non-broadcast side
        //   3. the type of elements in the broadcast side
        //   4. the type of the result, here a string
        
        new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
            // my matching logic
        }
    );
           

二.BroadcastProcessFunction和KeyedBroadcastProcessFunction

與CoProcessFunction的情況一樣,這些函數有兩種實作的處理方法;負責處理廣播流進入元件的processBroadcastElement()和用于非廣播的processElement()。這些方法的完整簽名如下所示:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
           
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
           

首先要注意的是,這兩個功能都需要實作方法processBroadcastElement()和processElement()用于處理廣播側元素和非廣播側元素。

兩種方法在提供上下文方面有所不同。非廣播方有ReadOnlyContext,而廣播方有Context。

這兩個上下文(ctx在下面的枚舉中):

  1. 允許通路廣播狀态: ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
  2. 允許查詢元素的時間戳:ctx.timestamp()
  3. 擷取目前的水印: ctx.currentWatermark()
  4. 獲得目前處理時間:ctx.currentProcessingTime()
  5. 将元素發射到側面輸出:ctx.output(OutputTag outputTag, X value)。

不同之處在于每個人對廣播狀态的通路類型。廣播方對此具有讀寫通路權限,而非廣播方具有隻讀通路權限(是以具有名稱)。原因是在Flink中沒有跨任務通信。是以,為確定我們操作的所有并行執行個體中,廣播狀态中的内容相同,我們僅向廣播端提供讀寫通路權限,廣播端在所有任務中看到的元素相同,是以我們需要對每個任務進行計算該端的傳入元素在所有任務中都相同。忽略此規則将破壞狀态的一緻性保證,進而導緻結果不一緻,并且常常難以調試結果。

注意:在所有并行執行個體中,processBroadcast()中實作的邏輯必須具有相同的确定性行為!
           

最後,由于KeyedBroadcastProcessFunction事實是在鍵控流上運作,是以它公開了某些功能,這些功能不适用于BroadcastProcessFunction。那是:

  1. 所述ReadOnlyContext的processElement()方法可以通路Flink的底層定時器服務,其允許注冊事件和處理時間的定時器。當計時器觸發時,onTimer()會使用調用, OnTimerContext公開了與ReadOnlyContextplus 相同的功能。詢問觸發的計時器是事件還是處理時間的能力,并且

    查詢與計時器關聯的鍵。

  2. 所述Context的processBroadcastElement()方法包含方法 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)。這允許一個注冊KeyedStateFunction将被施加到所有鍵的所有狀态與所提供的相關聯stateDescriptor。
注意:僅在KeyedBroadcastProcessFunction的processElement()處才可以注冊計時器。在processBroadcastElement()方法中是不可能的,因為沒有與廣播元素關聯的鍵。
           

回到我們的原始示例,我們KeyedBroadcastProcessFunction可能如下所示:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

    // store partial matches, i.e. first elements of the pair waiting for their second element
    // we keep a list as we may have many first elements waiting
    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
        new MapStateDescriptor<>(
            "items",
            BasicTypeInfo.STRING_TYPE_INFO,
            new ListTypeInfo<>(Item.class));

    // identical to our ruleStateDescriptor above
    private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
        new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));

    @Override
    public void processBroadcastElement(Rule value,
                                        Context ctx,
                                        Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    }

    @Override
    public void processElement(Item value,
                               ReadOnlyContext ctx,
                               Collector<String> out) throws Exception {

        final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
        final Shape shape = value.getShape();
    
        for (Map.Entry<String, Rule> entry :
                ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
            final String ruleName = entry.getKey();
            final Rule rule = entry.getValue();
    
            List<Item> stored = state.get(ruleName);
            if (stored == null) {
                stored = new ArrayList<>();
            }
    
            if (shape == rule.second && !stored.isEmpty()) {
                for (Item i : stored) {
                    out.collect("MATCH: " + i + " - " + value);
                }
                stored.clear();
            }
    
            // there is no else{} to cover if rule.first == rule.second
            if (shape.equals(rule.first)) {
                stored.add(value);
            }
    
            if (stored.isEmpty()) {
                state.remove(ruleName);
            } else {
                state.put(ruleName, stored);
            }
        }
    }
}
           

三.重要注意事項

描述了提供的API之後,本節重點介紹使用廣播狀态時要記住的重要事項。這些是:

  • 沒有跨任務通信:如前所述,這就是為什麼僅廣播方 KeyedBroadcastProcessFunction/BroadcastProcessFunction可以修改廣播狀态的内容的原因。此外,使用者必須確定所有任務對于每個傳入元素都以相同的方式修改廣播狀态的内容。否則,不同的任務可能具有不同的内容,進而導緻結果不一緻。
  • 廣播狀态中事件的順序在各個任務之間可能有所不同:盡管廣播流的元素保證了所有元素(最終)将進入所有下遊任務,但是元素對于每個任務的到達順序可能不同。是以,每個傳入元素的狀态更新必須不取決于傳入事件的順序。
  • 所有任務都會檢查其廣播狀态:盡管發生檢查點時,所有任務在其廣播狀态中具有相同的元素(檢查點屏障不會越過元素),但所有任務都将指向其廣播狀态,而不僅僅是其中一個。這是一項設計決策,要避免在還原過程中從同一檔案讀取所有任務(進而避免出現熱點),盡管這樣做的代價是将檢查點狀态的大小增加了p倍(并行度)。Flink保證在還原/縮放後不會重複,也不會丢失資料。在使用相同或更小的并行度進行恢複的情況下,每個任務都會讀取其檢查點狀态。擴充後,每個任務都會讀取自己的狀态,其餘任務(p_new-p_old)以循環方式讀取先前任務的檢查點。
  • 沒有RocksDB狀态後端:在運作時将廣播狀态保留在記憶體中,并且應該相應地進行記憶體配置。這适用于所有操作狀态。

繼續閱讀