天天看點

Flink 原理與實作:如何生成 JobGraph

轉載來源:http://wuchong.me/blog/2016/05/10/flink-internals-how-to-build-jobgraph/

繼前文Flink 原理與實作:架構和拓撲概覽中介紹了Flink的四層執行圖模型,本文将主要介紹 Flink 是如何将 StreamGraph 轉換成 JobGraph 的。根據使用者用Stream API編寫的程式,構造出一個代表拓撲結構的StreamGraph的。以 WordCount 為例,轉換圖如下圖所示:

Flink 原理與實作:如何生成 JobGraph

StreamGraph 和 JobGraph 都是在 Client 端生成的,也就是說我們可以在 IDE 中通過斷點調試觀察 StreamGraph 和 JobGraph 的生成過程。

JobGraph 的相關資料結構主要在 

org.apache.flink.runtime.jobgraph

 包中。構造 JobGraph 的代碼主要集中在 

StreamingJobGraphGenerator

 類中,入口函數是 

StreamingJobGraphGenerator.createJobGraph()

。我們首先來看下

StreamingJobGraphGenerator

的核心源碼:

public class StreamingJobGraphGenerator {

private StreamGraph streamGraph;

private JobGraph jobGraph;

// id -> JobVertex

private Map<Integer, JobVertex> jobVertices;

// 已經建構的JobVertex的id集合

private Collection<Integer> builtVertices;

// 實體邊集合(排除了chain内部的邊), 按建立順序排序

private List<StreamEdge> physicalEdgesInOrder;

// 儲存chain資訊,部署時用來建構 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig)

private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;

// 所有節點的配置資訊,id -> StreamConfig

private Map<Integer, StreamConfig> vertexConfigs;

// 儲存每個節點的名字,id -> chainedName

private Map<Integer, String> chainedNames;

// 構造函數,入參隻有 StreamGraph

public StreamingJobGraphGenerator(StreamGraph streamGraph) {

this.streamGraph = streamGraph;

}

// 根據 StreamGraph,生成 JobGraph

public JobGraph createJobGraph() {

jobGraph = new JobGraph(streamGraph.getJobName());

// streaming 模式下,排程模式是所有節點(vertices)一起啟動

jobGraph.setScheduleMode(ScheduleMode.ALL);

// 初始化成員變量

init();

// 廣度優先周遊 StreamGraph 并且為每個SteamNode生成hash id,

// 保證如果送出的拓撲沒有改變,則每次生成的hash都是一樣的

Map<Integer, byte[]> hashes = traverseStreamGraphAndGenerateHashes();

// 最重要的函數,生成JobVertex,JobEdge等,并盡可能地将多個節點chain在一起

setChaining(hashes);

// 将每個JobVertex的入邊集合也序列化到該JobVertex的StreamConfig中

// (出邊集合已經在setChaining的時候寫入了)

setPhysicalEdges();

// 根據group name,為每個 JobVertex 指定所屬的 SlotSharingGroup

// 以及針對 Iteration的頭尾設定 CoLocationGroup

setSlotSharing();

// 配置checkpoint

configureCheckpointing();

// 配置重新開機政策(不重新開機,還是固定延遲重新開機)

configureRestartStrategy();

try {

// 将 StreamGraph 的 ExecutionConfig 序列化到 JobGraph 的配置中

InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(), ExecutionConfig.CONFIG_KEY);

} catch (IOException e) {

throw new RuntimeException("Config object could not be written to Job Configuration: ", e);

}

return jobGraph;

}

...

}

StreamingJobGraphGenerator

的成員變量都是為了輔助生成最終的JobGraph。

createJobGraph()

函數的邏輯也很清晰,首先為所有節點生成一個唯一的hash id,如果節點在多次送出中沒有改變(包括并發度、上下遊等),那麼這個id就不會改變,這主要用于故障恢複。這裡我們不能用 

StreamNode.id

來代替,因為這是一個從1開始的靜态計數變量,同樣的Job可能會得到不一樣的id,如下代碼示例的兩個job是完全一樣的,但是source的id卻不一樣了。然後就是最關鍵的chaining處理,和生成JobVetex、JobEdge等。之後就是寫入各種配置相關的資訊。

// 範例1:A.id=1 B.id=2

DataStream<String> A = ...

DataStream<String> B = ...

A.union(B).print();

// 範例2:A.id=2 B.id=1

DataStream<String> B = ...

DataStream<String> A = ...

A.union(B).print();

下面具體分析下關鍵函數 

setChaining

 的實作:

// 從source開始建立 node chains

private void setChaining(Map<Integer, byte[]> hashes) {

for (Integer sourceNodeId : streamGraph.getSourceIDs()) {

createChain(sourceNodeId, sourceNodeId, hashes);

}

}

// 建構node chains,傳回目前節點的實體出邊

// startNodeId != currentNodeId 時,說明currentNode是chain中的子節點

private List<StreamEdge> createChain(

Integer startNodeId,

Integer currentNodeId,

Map<Integer, byte[]> hashes) {

if (!builtVertices.contains(startNodeId)) {

// 過渡用的出邊集合, 用來生成最終的 JobEdge, 注意不包括 chain 内部的邊

List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();

List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

// 将目前節點的出邊分成 chainable 和 nonChainable 兩類

for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {

if (isChainable(outEdge)) {

chainableOutputs.add(outEdge);

} else {

nonChainableOutputs.add(outEdge);

}

}

//==> 遞歸調用

for (StreamEdge chainable : chainableOutputs) {

transitiveOutEdges.addAll(createChain(startNodeId, chainable.getTargetId(), hashes));

}

for (StreamEdge nonChainable : nonChainableOutputs) {

transitiveOutEdges.add(nonChainable);

createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes);

}

// 生成目前節點的顯示名,如:"Keyed Aggregation -> Sink: Unnamed"

chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));

// 如果目前節點是起始節點, 則直接建立 JobVertex 并傳回 StreamConfig, 否則先建立一個空的 StreamConfig

// createJobVertex 函數就是根據 StreamNode 建立對應的 JobVertex, 并傳回了空的 StreamConfig

StreamConfig config = currentNodeId.equals(startNodeId)

? createJobVertex(startNodeId, hashes)

: new StreamConfig(new Configuration());

// 設定 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中.

// 其中包括 序列化器, StreamOperator, Checkpoint 等相關配置

setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

if (currentNodeId.equals(startNodeId)) {

// 如果是chain的起始節點。(不是chain中的節點,也會被标記成 chain start)

config.setChainStart();

// 我們也會把實體出邊寫入配置, 部署時會用到

config.setOutEdgesInOrder(transitiveOutEdges);

config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

// 将目前節點(headOfChain)與所有出邊相連

for (StreamEdge edge : transitiveOutEdges) {

// 通過StreamEdge建構出JobEdge,建立IntermediateDataSet,用來将JobVertex和JobEdge相連

connect(startNodeId, edge);

}

// 将chain中所有子節點的StreamConfig寫入到 headOfChain 節點的 CHAINED_TASK_CONFIG 配置中

config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

} else {

// 如果是 chain 中的子節點

Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);

if (chainedConfs == null) {

chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());

}

// 将目前節點的StreamConfig添加到該chain的config集合中

chainedConfigs.get(startNodeId).put(currentNodeId, config);

}

// 傳回連往chain外部的出邊集合

return transitiveOutEdges;

} else {

return new ArrayList<>();

}

}

每個 JobVertex 都會對應一個可序列化的 StreamConfig, 用來發送給 JobManager 和 TaskManager。最後在 TaskManager 中起 Task 時,需要從這裡面反序列化出所需要的配置資訊, 其中就包括了含有使用者代碼的StreamOperator。

setChaining

會對source調用

createChain

方法,該方法會遞歸調用下遊節點,進而建構出node chains。

createChain

會分析目前節點的出邊,根據Operator Chains中的chainable條件,将出邊分成chainalbe和noChainable兩類,并分别遞歸調用自身方法。之後會将StreamNode中的配置資訊序列化到StreamConfig中。如果目前不是chain中的子節點,則會建構 JobVertex 和 JobEdge相連。如果是chain中的子節點,則會将StreamConfig添加到該chain的config集合中。一個node chains,除了 headOfChain node會生成對應的 JobVertex,其餘的nodes都是以序列化的形式寫入到StreamConfig中,并儲存到headOfChain的 

CHAINED_TASK_CONFIG

 配置項中。直到部署時,才會取出并生成對應的ChainOperators,具體過程請見了解 Operator Chains。

總結

本文主要對 Flink 中将 StreamGraph 轉變成 JobGraph 的核心源碼進行了分析。思想還是很簡單的,StreamNode 轉成 JobVertex,StreamEdge 轉成 JobEdge,JobEdge 和 JobVertex 之間建立 IntermediateDataSet 來連接配接。關鍵點在于将多個 SteamNode chain 成一個 JobVertex的過程,這部分源碼比較繞,有興趣的同學可以結合源碼單步調試分析。下一章将會介紹 JobGraph 送出到 JobManager 後是如何轉換成分布式化的 ExecutionGraph 的。

繼續閱讀