天天看點

Apache Flink源碼解析之stream-transformationStreamTransformation小結

之前我們聊了Flink程式的<code>source</code>、<code>sink</code>就差<code>transformation</code>了。今天我們就來解讀一下Flink的<code>transformation</code>。它們三者的關系如下圖:

當然這還是從Flink程式設計API的角度來看的(程式設計視角)。所謂的<code>transformation</code>,用于轉換一個或多個<code>DataStream</code>進而形成一個新的<code>DataStream</code>對象。Flink提供程式設計接口,允許你組合這些<code>transformation</code>進而形成非常靈活的拓撲結構。

<code>StreamTransformation</code>是所有<code>transformation</code>的抽象類,提供了實作<code>transformation</code>的基礎功能。每一個<code>DataStream</code>都有一個與之對應的<code>StreamTransformation</code>。

一些API操作,比如<code>DataStream#map</code>,将會在底層建立一個<code>StreamTransformation</code>樹,而在程式的運作時,該拓撲結構會被翻譯為<code>StreamGraph</code>。

<code>StreamTransformation</code>無關運作時的執行,它隻是邏輯上的概念。

屬性如下:

name : 轉換器的名稱,這個主要用于可視化的目的

uid : 使用者指定的uid,該uid的主要目的是用于在job重新開機時可以再次配置設定跟之前相同的uid,應該是用于持久儲存狀态的目的。

bufferTimeout :<code>buffer</code>逾時時間

parallelism : 并行度

id : 跟屬性<code>uid</code>無關,它的生成方式是基于一個靜态累加器

outputType : 輸出類型

slotSharingGroup : 給目前的<code>transformation</code>設定slot共享組。<code>slot sharing group</code>用于将并行執行的<code>operator</code>“歸攏”到相同的<code>TaskManager slot</code>中(<code>slot</code>概念基于資源的劃分,是以這裡的目的是讓不同的<code>subtask</code>共享<code>slot</code>資源)

其中,<code>StreamTransformation</code>構造器需要的參數是:

name

outputType

parallelism

核心的抽象方法:

setChainingStrategy : 設定<code>chaining</code>政策

getTransitivePredecessors

:傳回中間過渡階段的前置<code>StreamTransformation</code>集合,該方法的可能的應用場景是用來決定在疊代中的<code>feedback edge</code>(回報邊)最終是有前置<code>StreamTransformation</code>。

因為就一層繼承關系的樹形結構,是以這裡類之間的關系圖就不再暫時了

絕大部分<code>StreamTransformation</code>都需要依賴上遊<code>StreamTransformation</code>作為輸入<code>SourceTransformation</code>等少數特例除外;

如果沒有特别說明,<code>getTransitivePredecessors</code>的實作邏輯都是,由自身加input(上遊<code>StreamTransformation</code>)組成的集合。

根據實作,我們可以将它們分成兩類:

I

:輸入輸出相關,需要自行定義<code>name</code>,都需要與之對應的<code>operator</code>,<code>setChainingStrategy</code>的實作都傳回<code>operator#setChainingStrategy</code>

屬于該分類的有:

II

:内置函數,<code>name</code>内部固定,無法更改,無需<code>operator</code>,<code>setChainingStrategy</code>的實作都隻是抛出<code>UnsupportedOperationException</code>異常

它表示一個sorce,它并不真正做轉換工作,因為它沒有輸入,但它是任何拓撲的根<code>StreamTransformation</code>。

除了<code>StreamTransformation</code>構造器需要的那三個參數,<code>SourceTransformation</code>還需要<code>StreamSource</code>類型的參數,它是真正執行轉換的<code>operator</code>。

值得一提的是,其<code>getTransitivePredecessors</code>抽象方法的實作:

因為其沒有前置轉換器,是以其傳回隻存儲自身執行個體的集合對象。

它表示一個sink,建立的時候構造器需要<code>operator</code> 它是

<code>StreamSink</code>的執行個體,是最終做轉換的<code>operator</code>。

<code>getTransitivePredecessors</code>方法的實作是将自身以及<code>input#getTransitivePredecessors</code>的傳回值(之前的<code>StreamTransformation</code>集合)集合

該類有兩個特别的屬性:

stateKeySelector

stateKeyType

這兩個屬性的目的是因為sink的狀态也可能是基于key分區的。

接受一種輸入的<code>StreamTransformation</code>(換句話說,隻接收一個輸入流)。跟上面的<code>SinkTransformation</code>構造器類似,需要<code>input</code>和<code>operator</code>兩個參數(隻不過這裡的<code>operator</code>類型是對應的<code>OneInputStreamOperator</code>)。

表示接收兩種輸入的<code>StreamTransformation</code>(接收兩種流作為輸入)。其他的實作同<code>OneInputTransformation</code>。

可将其看作分流轉換器,該轉換用于将一個流拆分成多個流(通過<code>OutputSelector</code>來達到這個目的),當然這個操作隻是邏輯上的拆分(它隻影響上遊的流如何跟下遊的流連接配接)。

構造該轉換器,同樣也是依賴于其輸入轉換器(<code>input</code>)以及一個輸出選擇器(<code>outputSelector</code>),但在執行個體化其父類(<code>StreamTransformation</code>,沒有提供自定義的名稱,而是固定的常量值<code>Split</code>)

該選擇轉換器用于從上遊流中篩選出特定的元素。它在使用時,必須跟随在<code>SplitTransformation</code>之後(<code>SplitTransformation</code>通過指定的名稱将元素配置設定到多個邏輯流中)。

構造<code>SelectTransformation</code>需要前一個轉換器作為輸入,以及上遊用于分流的<code>SplitTransformation</code>所使用的名稱。跟<code>SplitTransformation</code>類似,這裡也無需提供自定義的轉換器名稱,而是固定的常量值<code>Select</code>。

合并轉換器,該轉換器用于将多個輸入<code>StreamTransformation</code>進行合并。是以該轉換器接收<code>StreamTransformation</code>的集合。其名稱也在内部被固定為<code>Union</code>。

該轉換器用于改變輸入元素的分區,其名稱為:<code>Partition</code>。是以,工作時除了提供一個<code>StreamTransformation</code>作為輸入,還需要提供一個<code>StreamPartitioner</code>的執行個體來進行分區。

該轉換器用于表示Flink DAG中的一個回報點(<code>feedback point</code>)。所謂回報點,可用于連接配接一個或者多個<code>StreamTransformation</code>,這些<code>StreamTransformation</code>被稱為回報邊(<code>feedback edges</code>)。處于回報點下遊的operation将可以從回報點和回報邊獲得元素輸入。

回報轉換器的固定名稱為<code>Feedback</code>,它的執行個體化需要兩個參數:

input : 上遊輸入<code>StreamTransformation</code>

waitTime : <code>feedback operator</code>的等待時間,一旦超過該等待時間,将關閉并不再接收任何回報元素。

執行個體化<code>FeedbackTransformation</code>時,會自動建立一個用于存儲回報邊的集合<code>feedbackEdges</code>。那麼回報邊如何收集呢?<code>FeedbackTransformation</code>通過定義一個執行個體方法:<code>addFeedbackEdge</code>來進行收集,而這裡所謂的“收集”就是将下遊<code>StreamTransformation</code>的執行個體加入<code>feedbackEdges</code>集合中(這裡可以了解為将兩個點建立連接配接關系,也就形成了邊)。不過,這裡加入的<code>StreamTransformation</code>的執行個體有一個要求:也就是目前<code>FeedbackTransformation</code>的執行個體跟待加入<code>StreamTransformation</code>執行個體的并行度一緻。

某種程度上,你可以将其類比于pub-sub機制

某種程度上跟<code>FeedbackTransformation</code>類似。<code>feedback</code>元素的類型不需要跟上遊的<code>StreamTransformation</code>元素的類型一緻,因為<code>CoFeedbackTransformation</code>之後隻允許跟<code>TwoInputTransformations</code>。上遊的<code>StreamTransformation</code>将會連接配接到<code>TwoInputTransformations</code>第一個輸入,而<code>feedback edge</code>将會連接配接到其第二個輸入。是以上遊的<code>StreamTransformation</code>其實是跟<code>CoFeedbackTransformation</code>無關的,它跟<code>TwoInputTransformation</code>有關。

上遊的<code>StreamTransformation</code>跟<code>CoFeedbackTransformation</code>無關,從<code>CoFeedbackTransformation</code>構造器需要的參數就可以看出來。通常,其他的<code>StreamTransformation</code>的實作都需要傳入上遊的<code>StreamTransformation</code>作為其輸入。但<code>CoFeedbackTransformation</code>卻沒有,它隻需要上遊的并行度:<code>parallelism</code>。另外一個需要的參數是<code>feedbackType</code>。

它絕大部分實作跟<code>FeedbackTransformation</code>差別在于<code>getTransitivePredecessors</code>方法的實作。我們之前談及<code>getTransitivePredecessors</code>主要的應用場景就是用于<code>feedback</code>,而它又不像<code>FeedbackTransformation</code>跟其上遊輸入有關,是以它隻傳回了隻有目前執行個體的單元素集合。

本文剖析了Flink中的<code>StreamTransformation</code>實作。當然還沒有談到這些<code>transformation</code>之間是如何串聯起來,實作非常靈活的拓撲。這是我們後面會談論的内容。

原文釋出時間為:2016-05-14

本文作者:vinoYang