作者:崔星燦
整理:高赟
前面已經為大家介紹了
Flink 的基本概念以及安裝部署的過程 ,進而希望能夠幫助讀者建立起對 Flink 的初步印象。本次課程開始,我們将進入第二部分,即 Flink 實際開發的相關内容。本次課程将首先介紹 Flink 開發中比較核心的 DataStream API 。我們首先将回顧分布式流處理的一些基本概念,這些概念對于了解實際的 DataStream API 有非常大的作用。然後,我們将詳細介紹 DataStream API 的設計,最後我們将通過一個例子來示範 DataStream API 的使用。1. 流處理基本概念
對于什麼是流處理,從不同的角度有不同的定義。其實流處理與批處理這兩個概念是對立統一的,它們的關系有點類似于對于 Java 中的 ArrayList 中的元素,是直接看作一個有限資料集并用下标去通路,還是用疊代器去通路。
圖1. 左圖硬币分類器。硬币分類器也可以看作一個流處理系統,用于硬币分類的各部分元件提前串聯在一起,硬币不斷進入系統,并最終被輸出到不同的隊列中供後續使用。右圖同理。
流處理系統本身有很多自己的特點。一般來說,由于需要支援無限資料集的處理,流處理系統一般采用一種資料驅動的處理方式。它會提前設定一些算子,然後等到資料到達後對資料進行處理。為了表達複雜的計算邏輯,包括 Flink 在内的分布式流處理引擎一般采用 DAG 圖來表示整個計算邏輯,其中 DAG 圖中的每一個點就代表一個基本的邏輯單元,也就是前面說的算子。由于計算邏輯被組織成有向圖,資料會按照邊的方向,從一些特殊的 Source 節點流入系統,然後通過網絡傳輸、本地傳輸等不同的資料傳輸方式在算子之間進行發送和處理,最後會通過另外一些特殊的 Sink 節點将計算結果發送到某個外部系統或資料庫中。
圖2. 一個 DAG 計算邏輯圖與實際的實體時模型。邏輯圖中的每個算子在實體圖中可能有多個并發。
對于實際的分布式流處理引擎,它們的實際運作時實體模型要更複雜一些,這是由于每個算子都可能有多個執行個體。如圖 2 所示,作為 Source 的 A 算子有兩個執行個體,中間算子 C 也有兩個執行個體。在邏輯模型中,A 和 B 是 C 的上遊節點,而在對應的實體邏輯中,C 的所有執行個體和 A、B 的所有執行個體之間可能都存在資料交換。在實體模型中,我們會根據計算邏輯,采用系統自動優化或人為指定的方式将計算工作分布到不同的執行個體中。隻有當算子執行個體分布到不同程序上時,才需要通過網絡進行資料傳輸,而同一程序中的多個執行個體之間的資料傳輸通常是不需要通過網絡的。
表1. Apache Storm 構造 DAG 計算圖。Apache Storm 的接口定義更加“面向操作”,是以更加底層。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
表2. Apache Flink 構造 DAG 計算圖。Apache Flink 的接口定義更加“面向資料”,是以更加高層。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile ("input");
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
counts.writeAsText("output");
由于流處理的計算邏輯是通過 DAG 圖來表示的,是以它們的大部分 API 都是圍繞建構這種計算邏輯圖來設計的。例如,對于幾年前非常流行的 Apache Storm,它的 Word Count 的示例如表 1 所示。基于 Apache Storm 使用者需要在圖中添加 Spout 或 Bolt 這種算子,并指定算子之前的連接配接方式。這樣,在完成整個圖的建構之後,就可以将圖送出到遠端或本地叢集運作。
與之對比,Apache Flink 的接口雖然也是在建構計算邏輯圖,但是 Flink 的 API 定義更加面向資料本身的處理邏輯,它把資料流抽象成為一個無限集,然後定義了一組集合上的操作,然後在底層自動建構相應的 DAG 圖。可以看出,Flink 的 API 要更“上層”一些。許多研究者在進行實驗時,可能會更喜歡自由度高的 Storm,因為它更容易保證實作預想的圖結構;而在工業界則更喜歡 Flink 這類進階 API,因為它使用更加簡單。
2. Flink DataStream API 概覽
基于前面對流處理的基本概念,本節将詳細介紹 Flink DataStream API 的使用方式。我們首先還是從一個簡單的例子開始看起。表3是一個流式 Word Count 的示例,雖然它隻有 5 行代碼,但是它給出了基于 Flink DataStream API 開發程式的基本結構。
表3. 基于 Flink DataStream API 的 Word Count 示例.
//1、設定運作環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、配置資料源讀取資料
DataStream<String> text = env.readTextFile ("input");
//3、進行一系列轉換
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
//4、配置資料彙寫出資料
counts.writeAsText("output");
//5、送出執行
env.execute("Streaming WordCount");
為了實作流式 Word Count,我們首先要先獲得一個 StreamExecutionEnvironment 對象。它是我們建構圖過程中的上下文對象。基于這個對象,我們可以添加一些算子。對于流處理程度,我們一般需要首先建立一個資料源去接入資料。在這個例子中,我們使用了 Environment 對象中内置的讀取檔案的資料源。這一步之後,我們拿到的是一個 DataStream 對象,它可以看作一個無限的資料集,可以在該集合上進行一序列的操作。例如,在 Word Count 例子中,我們首先将每一條記錄(即檔案中的一行)分隔為單詞,這是通過 FlatMap 操作來實作的。調用 FlatMap 将會在底層的 DAG 圖中添加一個 FlatMap 算子。然後,我們得到了一個記錄是單詞的流。我們将流中的單詞進行分組(keyBy),然後累積計算每一個單詞的資料(sum(1))。計算出的單詞的資料組成了一個新的流,我們将它寫入到輸出檔案中。
最後,我們需要調用 env#execute 方法來開始程式的執行。需要強調的是,前面我們調用的所有方法,都不是在實際處理資料,而是在構通表達計算邏輯的 DAG 圖。隻有當我們将整個圖建構完成并顯式的調用 Execute 方法後,架構才會把計算圖提供到叢集中,接入資料并執行實際的邏輯。
基于流式 Word Count 的例子可以看出,基于 Flink 的 DataStream API 來編寫流處理程式一般需要三步:通過 Source 接入資料、進行一系統列的處理以及将資料寫出。最後,不要忘記顯式調用 Execute 方式,否則前面編寫的邏輯并不會真正執行。
圖3. Flink DataStream 操作概覽
從上面的例子中還可以看出,Flink DataStream API 的核心,就是代表流資料的 DataStream 對象。整個計算邏輯圖的建構就是圍繞調用 DataStream 對象上的不同操作産生新的 DataStream 對象展開的。整體來說,DataStream 上的操作可以分為四類。第一類是對于單條記錄的操作,比如篩除掉不符合要求的記錄(Filter 操作),或者将每條記錄都做一個轉換(Map 操作)。第二類是對多條記錄的操作。比如說統計一個小時内的訂單總成交量,就需要将一個小時内的所有訂單記錄的成交量加到一起。為了支援這種類型的操作,就得通過 Window 将需要的記錄關聯到一起進行處理。第三類是對多個流進行操作并轉換為單個流。例如,多個流可以通過 Union、Join 或 Connect 等操作合到一起。這些操作合并的邏輯不同,但是它們最終都會産生了一個新的統一的流,進而可以進行一些跨流的操作。最後, DataStream 還支援與合并對稱的操作,即把一個流按一定規則拆分為多個流(Split 操作),每個流是之前流的一個子集,這樣我們就可以對不同的流作不同的處理。
圖4. 不同類型的 DataStream 子類型。不同的子類型支援不同的操作集合。
為了支援這些不同的流操作,Flink 引入了一組不同的流類型,用來表示某些操作的中間流資料集類型。完整的類型轉換關系如圖4所示。首先,對于一些針對單條記錄的操作,如 Map 等,操作的結果仍然是是基本的 DataStream 類型。然後,對于 Split 操作,它會首先産生一個 SplitStream,基于 SplitStream 可以使用 Select 方法來篩選出符合要求的記錄并再将得到一個基本的流。
類似的,對于 Connect 操作,在調用 streamA.connect(streamB)後可以得到一個專門的 ConnectedStream。ConnectedStream 支援的操作與普通的 DataStream 有所差別,由于它代表兩個不同的流混合的結果,是以它允許使用者對兩個流中的記錄分别指定不同的處理邏輯,然後它們的處理結果形成一個新的 DataStream 流。由于不同記錄的處理是在同一個算子中進行的,是以它們在處理時可以友善的共享一些狀态資訊。上層的一些 Join 操作,在底層也是需要依賴于 Connect 操作來實作的。
另外,如前所述,我們可以通過 Window 操作對流可以按時間或者個數進行一些切分,進而将流切分成一個個較小的分組。具體的切分邏輯可以由使用者進行選擇。當一個分組中所有記錄都到達後,使用者可以拿到該分組中的所有記錄,進而可以進行一些周遊或者累加操作。這樣,對每個分組的處理都可以得到一組輸出資料,這些輸出資料形成了一個新的基本流。
對于普通的 DataStream,我們必須使用 allWindow 操作,它代表對整個流進行統一的 Window 處理,是以是不能使用多個算子執行個體進行同時計算的。針對這一問題,就需要我們首先使用 KeyBy 方法對記錄按 Key 進行分組,然後才可以并行的對不同 Key 對應的記錄進行單獨的 Window 操作。KeyBy 操作是我們日常程式設計中最重要的操作之一,下面我們會更詳細的介紹。
圖5. 基本流上的 Window 操作與 KeyedStream 上的 Window 操對比。KeyedStream 上的 Window 操作使采用多個執行個體并發處理成為了可能。
基本 DataStream 對象上的 allWindow 與 KeyedStream 上的 Window 操作的對比如圖5所示。為了能夠在多個并發執行個體上并行的對資料進行處理,我們需要通過 KeyBy 将資料進行分組。KeyBy 和 Window 操作都是對資料進行分組,但是 KeyBy 是在水準分向對流進行切分,而 Window 是在垂直方式對流進行切分。
使用 KeyBy 進行資料切分之後,後續算子的每一個執行個體可以隻處理特定 Key 集合對應的資料。除了處理本身外,Flink 中允許算子維護一部分狀态(State),在KeyedStream 算子的狀态也是可以分布式存儲的。由于 KeyBy 是一種确定的資料配置設定方式(下文将介紹其它配置設定方式),是以即使發生 Failover 作業重新開機,甚至發生了并發度的改變,Flink 都可以重新配置設定 Key 分組并保證處理某個 Key 的分組一定包含該 Key 的狀态,進而保證一緻性。
最後需要強調的是,KeyBy 操作隻有當 Key 的數量超過算子的并發執行個體數才可以較好的工作。由于同一個 Key 對應的所有資料都會發送到同一個執行個體上,是以如果Key 的數量比執行個體數量少時,就會導緻部分執行個體收不到資料,進而導緻計算能力不能充分發揮。
3. 其它問題
除 KeyBy 之外,Flink 在算子之前交換資料時還支援其它的實體分組方式。如圖 1 所示,Flink DataStream 中實體分組方式包括:
- Global: 上遊算子将所有記錄發送給下遊算子的第一個執行個體。
- Broadcast: 上遊算子将每一條記錄發送給下遊算子的所有執行個體。
- Forward:隻适用于上遊算子執行個體數與下遊算子相同時,每個上遊算子執行個體将記錄發送給下遊算子對應的執行個體。
- Shuffle:上遊算子對每條記錄随機選擇一個下遊算子進行發送。
- Rebalance:上遊算子通過輪詢的方式發送資料。
- Rescale:當上遊和下遊算子的執行個體數為 n 或 m 時,如果 n < m,則每個上遊執行個體向ceil(m/n)或floor(m/n)個下遊執行個體輪詢發送資料;如果 n > m,則 floor(n/m) 或 ceil(n/m) 個上遊執行個體向下遊執行個體輪詢發送資料。
- PartitionCustomer:當上述内置配置設定方式不滿足需求時,使用者還可以選擇自定義分組方式。
圖6. 除keyBy外其它的實體分組方式。
除分組方式外,Flink DataStream API 中另一個重要概念就是類型系統。圖 7 所示,Flink DataStream 對像都是強類型的,每一個 DataStream 對象都需要指定元素的類型,Flink 自己底層的序列化機制正是依賴于這些資訊對序列化等進行優化。具體來說,在 Flink 底層,它是使用 TypeInformation 對象對類型進行描述的,TypeInformation 對象定義了一組類型相關的資訊供序列化架構使用。
圖7. Flink DataStream API 中的類型系統
Flink 内置了一部分常用的基本類型,對于這些類型,Flink 也内置了它們的TypeInformation,使用者一般可以直接使用而不需要額外的聲明,Flink 自己可以通過類型推斷機制識别出相應的類型。但是也會有一些例外的情況,比如,Flink DataStream API 同時支援 Java 和 Scala,Scala API 許多接口是通過隐式的參數來傳遞類型資訊的,是以如果需要通過 Java 調用 Scala 的 API,則需要把這些類型資訊通過隐式參數傳遞過去。另一個例子是 Java 中對泛型存在類型擦除,如果流的類型本身是一個泛型的話,則可能在擦除之後無法推斷出類型資訊,這時候也需要顯式的指定。
在 Flink 中,一般 Java 接口采用 Tuple 類型來組合多個字段,而 Scala 則更經常使用 Row 類型或 Case Class。相對于 Row,Tuple 類型存在兩個問題,一個是字段個數不能超過 25 個,此外,所有字段不允許有 null 值。最後,Flink 也支援使用者自定義新的類型和 TypeInformation,并通過 Kryo 來實作序列化,但是這種方式可帶來一些遷移等方面的問題,是以盡量不要使用自定義的類型。
4.示例
然後,我們再看一個更複雜的例子。假設我們有一個資料源,它監控系統中訂單的情況,當有新訂單時,它使用 Tuple2 輸出訂單中商品的類型和交易額。然後,我們希望實時統計每個類别的交易額,以及實時統計全部類别的交易額。
表4. 實時訂單統計示例。
public class GroupedProcessingTimeWindowSample {
private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
Random random = new Random();
while (isRunning) {
Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5);
String key = "類别" + (char) ('A' + random.nextInt(3));
int value = random.nextInt(10) + 1;
System.out.println(String.format("Emits\t(%s, %d)", key, value));
ctx.collect(new Tuple2<>(key, value));
}
}
@Override
public void cancel() {
isRunning = false;
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<Tuple2<String, Integer>> ds = env.addSource(new DataSource());
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = ds.keyBy(0);
keyedStream.sum(1).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return "";
}
}).fold(new HashMap<String, Integer>(), new FoldFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> fold(HashMap<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
accumulator.put(value.f0, value.f1);
return accumulator;
}
}).addSink(new SinkFunction<HashMap<String, Integer>>() {
@Override
public void invoke(HashMap<String, Integer> value, Context context) throws Exception {
// 每個類型的商品成交量
System.out.println(value);
// 商品成交總量
System.out.println(value.values().stream().mapToInt(v -> v).sum());
}
});
env.execute();
}
}
示例的實作如表4所示。首先,在該實作中,我們首先實作了一個模拟的資料源,它繼承自 RichParallelSourceFunction,它是可以有多個執行個體的 SourceFunction 的接口。它有兩個方法需要實作,一個是 Run 方法,Flink 在運作時對 Source 會直接調用該方法,該方法需要不斷的輸出資料,進而形成初始的流。在 Run 方法的實作中,我們随機的産生商品類别和交易量的記錄,然後通過 ctx#collect 方法進行發送。另一個方法是 Cancel 方法,當 Flink 需要 Cancel Source Task 的時候會調用該方法,我們使用一個 Volatile 類型的變量來标記和控制執行的狀态。
然後,我們在 Main 方法中就可以開始圖的建構。我們首先建立了一個 StreamExecutioniEnviroment 對象。建立對象調用的 getExecutionEnvironment 方法會自動判斷所處的環境,進而建立合适的對象。例如,如果我們在 IDE 中直接右鍵運作,則會建立 LocalStreamExecutionEnvironment 對象;如果是在一個實際的環境中,則會建立 RemoteStreamExecutionEnvironment 對象。
基于 Environment 對象,我們首先建立了一個 Source,進而得到初始的<商品類型,成交量>流。然後,為了統計每種類别的成交量,我們使用 KeyBy 按 Tuple 的第 1 個字段(即商品類型)對輸入流進行分組,并對每一個 Key 對應的記錄的第 2 個字段(即成交量)進行求合。在底層,Sum 算子内部會使用 State 來維護每個Key(即商品類型)對應的成交量之和。當有新記錄到達時,Sum 算子内部會更新所維護的成交量之和,并輸出一條<商品類型,更新後的成交量>記錄。
如果隻統計各個類型的成交量,則程式可以到此為止,我們可以直接在 Sum 後添加一個 Sink 算子對不斷更新的各類型成交量進行輸出。但是,我們還需要統計所有類型的總成交量。為了做到這一點,我們需要将所有記錄輸出到同一個計算節點的執行個體上。我們可以通過 KeyBy 并且對所有記錄傳回同一個 Key,将所有記錄分到同一個組中,進而可以全部發送到同一個執行個體上。
然後,我們使用 Fold 方法來在算子中維護每種類型商品的成交量。注意雖然目前 Fold 方法已經被标記為 Deprecated,但是在 DataStream API 中暫時還沒有能替代它的其它操作,是以我們仍然使用 Fold 方法。這一方法接收一個初始值,然後當後續流中每條記錄到達的時候,算子會調用所傳遞的 FoldFunction 對初始值進行更新,并發送更新後的值。我們使用一個 HashMap 來對各個類别的目前成交量進行維護,當有一條新的<商品類别,成交量>到達時,我們就更新該 HashMap。這樣在 Sink 中,我們收到的是最新的商品類别和成交量的 HashMap,我們可以依賴這個值來輸出各個商品的成交量和總的成交量。
需要指出的是,這個例子主要是用來示範 DataStream API 的用法,實際上還會有更高效的寫法,此外,更上層的 Table / SQL 還支援 Retraction 機制,可以更好的處理這種情況。
圖8. API 原理圖
最後,我們對 DataStream API 的原理進行簡要的介紹。當我們調用 DataStream#map 算法時,Flink 在底層會建立一個 Transformation 對象,這一對象就代表我們計算邏輯圖中的節點。它其中就記錄了我們傳入的 MapFunction,也就是 UDF(User Define Function)。随着我們調用更多的方法,我們建立了更多的 DataStream 對象,每個對象在内部都有一個 Transformation 對象,這些對象根據計算依賴關系組成一個圖結構,就是我們的計算圖。後續 Flink 将對這個圖結構進行進一步的轉換,進而最終生成送出作業所需要的 JobGraph。
5. 總結
本文主要介紹了 Flink DataStream API,它是目前 Flink 中比較底層的一套 API。在實際的開發中,基于該 API 需要使用者自己處理 State 與 Time 等一些概念,是以需要較大的工作量。後續課程還會介紹更上層的 Table / SQL 層的 API,未來 Table / SQL 可能會成為 Flink 主流的 API,但是對于接口來說,越底層的接口表達能力越強,在一些需要精細操作的情況下,仍然需要依賴于 DataStream API。