天天看點

深入解析 Flink 的算子鍊機制

“為什麼我的 Flink 作業 Web UI 中隻顯示出了一個框,并且 Records Sent 和Records Received 名額都是 0 ?是我的程式寫得有問題嗎?”

Flink 算子鍊簡介

筆者在 Flink 社群群裡經常能看到類似這樣的疑問。這種情況幾乎都不是程式有問題,而是因為 Flink 的 operator chain ——即算子鍊機制導緻的,即送出的作業的執行計劃中,所有算子的并發執行個體(即 sub-task )都因為滿足特定條件而串成了整體來執行,自然就觀察不到算子之間的資料流量了。

當然上述是一種特殊情況。我們更常見到的是隻有部分算子得到了算子鍊機制的優化,如官方文檔中出現過多次的下圖所示,注意 Source 和 map() 算子。

深入解析 Flink 的算子鍊機制

算子鍊機制的好處是顯而易見的:所有 chain 在一起的 sub-task 都會在同一個線程(即 TaskManager 的 slot)中執行,能夠減少不必要的資料交換、序列化和上下文切換,進而提高作業的執行效率。

深入解析 Flink 的算子鍊機制

鋪墊了這麼多,接下來就通過源碼簡單看看算子鍊産生的條件,以及它是如何在 Flink Runtime 中實作的。

邏輯計劃中的算子鍊

對 Flink Runtime 稍有了解的看官應該知道,Flink 作業的執行計劃會用三層圖結構來表示,即:

  • StreamGraph —— 原始邏輯執行計劃
  • JobGraph —— 優化的邏輯執行計劃(Web UI 中看到的就是這個)
  • ExecutionGraph —— 實體執行計劃

算子鍊是在優化邏輯計劃時加入的,也就是由 StreamGraph 生成 JobGraph 的過程中。那麼我們來到負責生成 JobGraph 的 o.a.f.streaming.api.graph.StreamingJobGraphGenerator 類,檢視其核心方法 createJobGraph() 的源碼。

private JobGraph createJobGraph() {
    // make sure that all vertices start immediately
    jobGraph.setScheduleMode(streamGraph.getScheduleMode());
    // Generate deterministic hashes for the nodes in order to identify them across
    // submission iff they didn't change.
    Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
    // Generate legacy version hashes for backwards compatibility
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }
    Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
    setChaining(hashes, legacyHashes, chainedOperatorHashes);

    setPhysicalEdges();
    // 略......

    return jobGraph;
}      

可見,該方法會先計算出 StreamGraph 中各個節點的哈希碼作為唯一辨別,并建立一個空的 Map 結構儲存即将被鍊在一起的算子的哈希碼,然後調用 setChaining() 方法,如下源碼所示。

private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
    for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
        createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
    }
}      

可見是逐個周遊 StreamGraph 中的 Source 節點,并調用 createChain() 方法。createChain() 是邏輯計劃層建立算子鍊的核心方法,完整源碼如下,有點長。

private List<StreamEdge> createChain(
        Integer startNodeId,
        Integer currentNodeId,
        Map<Integer, byte[]> hashes,
        List<Map<Integer, byte[]>> legacyHashes,
        int chainIndex,
        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
    if (!builtVertices.contains(startNodeId)) {
        List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
        List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
        List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

        StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
        for (StreamEdge outEdge : currentNode.getOutEdges()) {
            if (isChainable(outEdge, streamGraph)) {
                chainableOutputs.add(outEdge);
            } else {
                nonChainableOutputs.add(outEdge);
            }
        }

        for (StreamEdge chainable : chainableOutputs) {
            transitiveOutEdges.addAll(
                    createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
        }

        for (StreamEdge nonChainable : nonChainableOutputs) {
            transitiveOutEdges.add(nonChainable);
            createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
        }

        List<Tuple2<byte[], byte[]>> operatorHashes =
            chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

        byte[] primaryHashBytes = hashes.get(currentNodeId);
        OperatorID currentOperatorId = new OperatorID(primaryHashBytes);

        for (Map<Integer, byte[]> legacyHash : legacyHashes) {
            operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
        }

        chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
        chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
        chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

        if (currentNode.getInputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
        }
        if (currentNode.getOutputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
        }

        StreamConfig config = currentNodeId.equals(startNodeId)
                ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                : new StreamConfig(new Configuration());

        setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

        if (currentNodeId.equals(startNodeId)) {
            config.setChainStart();
            config.setChainIndex(0);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
            config.setOutEdgesInOrder(transitiveOutEdges);
            config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
            for (StreamEdge edge : transitiveOutEdges) {
                connect(startNodeId, edge);
            }
            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
        } else {
            chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
            config.setChainIndex(chainIndex);
            StreamNode node = streamGraph.getStreamNode(currentNodeId);
            config.setOperatorName(node.getOperatorName());
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }

        config.setOperatorID(currentOperatorId);
        if (chainableOutputs.isEmpty()) {
            config.setChainEnd();
        }
        return transitiveOutEdges;
    } else {
        return new ArrayList<>();
    }
}      

先解釋一下方法開頭建立的 3 個 List 結構:

  • transitiveOutEdges:目前算子鍊在 JobGraph 中的出邊清單,同時也是 createChain() 方法的最終傳回值;
  • chainableOutputs:目前能夠鍊在一起的 StreamGraph 邊清單;
  • nonChainableOutputs:目前不能夠鍊在一起的 StreamGraph 邊清單。

接下來,從 Source 開始周遊 StreamGraph 中目前節點的所有出邊,調用 isChainable() 方法判斷是否可以被鍊在一起(這個判斷邏輯稍後會講到)。可以連結的出邊被放入 chainableOutputs 清單,否則放入 nonChainableOutputs 清單。

對于 chainableOutputs 中的邊,就會以這些邊的直接下遊為起點,繼續遞歸調用createChain() 方法延展算子鍊。對于 nonChainableOutputs 中的邊,由于目前算子鍊的延展已經到頭,就會以這些“斷點”為起點,繼續遞歸調用 createChain() 方法試圖建立新的算子鍊。也就是說,邏輯計劃中整個建立算子鍊的過程都是遞歸的,亦即實際傳回時,是從 Sink 端開始傳回的。

然後要判斷目前節點是不是算子鍊的起始節點。如果是,則調用 createJobVertex()方法為算子鍊建立一個 JobVertex( 即 JobGraph 中的節點),也就形成了我們在Web UI 中看到的 JobGraph 效果:

深入解析 Flink 的算子鍊機制

最後,還需要将各個節點的算子鍊資料寫入各自的 StreamConfig 中,算子鍊的起始節點要額外儲存下 transitiveOutEdges。StreamConfig 在後文的實體執行階段會再次用到。

形成算子鍊的條件

來看看 isChainable() 方法的代碼。 由此可得,上下遊算子能夠 chain 在一起的條件還是非常苛刻的(老生常談了),列舉如下:

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
    StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();

    return downStreamVertex.getInEdges().size() == 1
            && outOperator != null
            && headOperator != null
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
            && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
            && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
            && (edge.getPartitioner() instanceof ForwardPartitioner)
            && edge.getShuffleMode() != ShuffleMode.BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
            && streamGraph.isChainingEnabled();
}      
  • 上下遊算子執行個體處于同一個 SlotSharingGroup 中(之後再提);
  • 下遊算子的連結政策(ChainingStrategy)為 ALWAYS ——既可以與上遊連結,也可以與下遊連結。我們常見的 map()、filter() 等都屬此類;
  • 上遊算子的連結政策為 HEAD 或 ALWAYS。HEAD 政策表示隻能與下遊連結,這在正常情況下是 Source 算子的專屬;
  • 兩個算子間的實體分區邏輯是 ForwardPartitioner ,可參見之前寫過的《聊聊Flink DataStream 的八種實體分區邏輯》;
  • 兩個算子間的 shuffle 方式不是批處理模式;
  • 上下遊算子執行個體的并行度相同;
  • 沒有禁用算子鍊。

禁用算子鍊

使用者可以在一個算子上調用 startNewChain() 方法強制開始一個新的算子鍊,或者調用 disableOperatorChaining() 方法指定它不參與算子鍊。代碼位于 SingleOutputStreamOperator 類中,都是通過改變算子的連結政策實作的。

@PublicEvolving
public SingleOutputStreamOperator<T> disableChaining() {
    return setChainingStrategy(ChainingStrategy.NEVER);
}

@PublicEvolving
public SingleOutputStreamOperator<T> startNewChain() {
    return setChainingStrategy(ChainingStrategy.HEAD);
}      

如果要在整個運作時環境中禁用算子鍊,調用 StreamExecutionEnvironment.disableOperatorChaining() 方法即可。

實體計劃中的算子鍊

在 JobGraph 轉換成 ExecutionGraph 并交由 TaskManager 執行之後,會生成排程執行的基本任務單元 ——StreamTask,負責執行具體的 StreamOperator 邏輯。在StreamTask.invoke() 方法中,初始化了狀态後端、checkpoint 存儲和定時器服務之後,可以發現:

operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();      

構造出了一個 OperatorChain 執行個體,這就是算子鍊在實際執行時的形态。解釋一下OperatorChain 中的幾個主要屬性。

private final StreamOperator<?>[] allOperators;
private final RecordWriterOutput<?>[] streamOutputs;
private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
private final OP headOperator;      
  • headOperator:算子鍊的第一個算子,對應 JobGraph 中的算子鍊起始節點;
  • allOperators:算子鍊中的所有算子,倒序排列,即 headOperator 位于該數組的末尾;
  • streamOutputs:算子鍊的輸出,可以有多個;
  • chainEntryPoint:算子鍊的“入口點”,它的含義将在後文說明。

由上可知,所有 StreamTask 都會建立 OperatorChain。如果一個算子無法進入算子鍊,也會形成一個隻有 headOperator 的單個算子的 OperatorChain。

OperatorChain 構造方法中的核心代碼如下。

for (int i = 0; i < outEdgesInOrder.size(); i++) {
    StreamEdge outEdge = outEdgesInOrder.get(i);
    RecordWriterOutput<?> streamOutput = createStreamOutput(
        recordWriters.get(i),
        outEdge,
        chainedConfigs.get(outEdge.getSourceId()),
        containingTask.getEnvironment());
    this.streamOutputs[i] = streamOutput;
    streamOutputMap.put(outEdge, streamOutput);
}

// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
    containingTask,
    configuration,
    chainedConfigs,
    userCodeClassloader,
    streamOutputMap,
    allOps);

if (operatorFactory != null) {
    WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();
    headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output);
    headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
} else {
    headOperator = null;
}

// add head operator to end of chain
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);      

首先會周遊算子鍊整體的所有出邊,并調用 createStreamOutput() 方法建立對應的下遊輸出 RecordWriterOutput。然後就會調用 createOutputCollector() 方法建立實體的算子鍊,并傳回 chainEntryPoint,這個方法比較重要,部分代碼如下。

private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
        StreamTask<?, ?> containingTask,
        StreamConfig operatorConfig,
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
        List<StreamOperator<?>> allOperators) {
    List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);

    // create collectors for the network outputs
    for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
        @SuppressWarnings("unchecked")
        RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
        allOutputs.add(new Tuple2<>(output, outputEdge));
    }

    // Create collectors for the chained outputs
    for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
        int outputId = outputEdge.getTargetId();
        StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
        WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
            containingTask,
            chainedOpConfig,
            chainedConfigs,
            userCodeClassloader,
            streamOutputs,
            allOperators,
            outputEdge.getOutputTag());
        allOutputs.add(new Tuple2<>(output, outputEdge));
    }
    // 以下略......
}      

該方法從上一節提到的 StreamConfig 中分别取出出邊和連結邊的資料,并建立各自的 Output。出邊的 Output 就是将資料發往算子鍊之外下遊的 RecordWriterOutput,而連結邊的輸出要靠 createChainedOperator() 方法。

private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
        StreamTask<?, ?> containingTask,
        StreamConfig operatorConfig,
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
        List<StreamOperator<?>> allOperators,
        OutputTag<IN> outputTag) {
    // create the output that the operator writes to first. this may recursively create more operators
    WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
        containingTask,
        operatorConfig,
        chainedConfigs,
        userCodeClassloader,
        streamOutputs,
        allOperators);

    // now create the operator and give it the output collector to write its output to
    StreamOperatorFactory<OUT> chainedOperatorFactory = operatorConfig.getStreamOperatorFactory(userCodeClassloader);
    OneInputStreamOperator<IN, OUT> chainedOperator = chainedOperatorFactory.createStreamOperator(
            containingTask, operatorConfig, chainedOperatorOutput);

    allOperators.add(chainedOperator);

    WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
    if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
        currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
    }
    else {
        TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
        currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
    }

    // wrap watermark gauges since registered metrics must be unique
    chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
    chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
    return currentOperatorOutput;
}      

我們一眼就可以看到,這個方法遞歸調用了上述 createOutputCollector() 方法,與邏輯計劃階段類似,通過不斷延伸 Output 來産生 chainedOperator(即算子鍊中除了headOperator 之外的算子),并逆序傳回,這也是 allOperators 數組中的算子順序為倒序的原因。

chainedOperator 産生之後,将它們通過 ChainingOutput 連接配接起來,形成如下圖所示的結構。

深入解析 Flink 的算子鍊機制
圖檔來自: http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/

最後來看看 ChainingOutput.collect() 方法是如何輸出資料流的。

@Override
public void collect(StreamRecord<T> record) {
    if (this.outputTag != null) {
        // we are only responsible for emitting to the main input
        return;
    }
    pushToOperator(record);
}

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
    if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
        // we are only responsible for emitting to the side-output specified by our
        // OutputTag.
        return;
    }
    pushToOperator(record);
}

protected <X> void pushToOperator(StreamRecord<X> record) {
    try {
        // we know that the given outputTag matches our OutputTag so the record
        // must be of the type that our operator expects.
        @SuppressWarnings("unchecked")
        StreamRecord<T> castRecord = (StreamRecord<T>) record;
        numRecordsIn.inc();
        operator.setKeyContextElement1(castRecord);
        operator.processElement(castRecord);
    }
    catch (Exception e) {
        throw new ExceptionInChainedOperatorException(e);
    }
}      

可見是通過調用連結算子的 processElement() 方法,直接将資料推給下遊處理了。也就是說,OperatorChain 完全可以看做一個由 headOperator 和 streamOutputs組成的單個算子,其内部的 chainedOperator 和 ChainingOutput 都像是被黑盒遮蔽,同時沒有引入任何 overhead。

打通了算子鍊在執行層的邏輯,看官應該會明白 chainEntryPoint 的含義了。由于它位于遞歸傳回的終點,是以它就是流入算子鍊的起始 Output,即上圖中指向 headOperator 的 RecordWriterOutput。

文章轉載自簡書,作者:LittleMagic。原文連結:

https://www.jianshu.com/p/799744e347c7
深入解析 Flink 的算子鍊機制

繼續閱讀