天天看點

Apache Storm 官方文檔 —— Trident API 概述

trident 的核心資料模型是“流”(stream),不過與普通的拓撲不同的是,這裡的流是作為一連串 batch 來處理的。流是分布在叢集中的不同節點上運作的,并且對流的操作也是在流的各個 partition 上并行運作的。

trident 中有 5 類操作:

針對每個小分區(partition)的本地操作,這類操作不會産生網絡資料傳輸;

針對一個資料流的重新分區操作,這類操作不會改變資料流中的内容,但是會産生一定的網絡傳輸;

通過網絡資料傳輸進行的聚合操作;

針對資料流的分組操作;

融合與聯結操作。

本地分區操作是在每個分區塊上獨立運作的操作,其中不涉及網絡資料傳輸。

函數負責接收一個輸入域的集合并選擇輸出或者不輸出 tuple。輸出 tuple 的域會被添加到原始資料流的輸入域中。如果一個函數不輸出 tuple,那麼原始的輸入 tuple 就會被直接過濾掉。否則,每個輸出 tuple 都會複制一份輸入 tuple 。假設你有下面這樣的函數:

再假設你有一個名為 “mystream” 的資料流,這個流中包含下面幾個 tuple,每個 tuple 中包含有 “a”、“b”、“c” 三個域:

如果你運作這段代碼:

那麼最終輸出的結果 tuple 就會包含有 “a”、“b”、“c”、“d” 4 個域,就像下面這樣:

過濾器負責判斷輸入的 tuple 是否需要保留。以下面的過濾器為例:

通過使用這段代碼:

就可以将下面這樣帶有 “a”、“b”、“c” 三個域的 tuple

最終轉化成這樣的結果 tuple:

<code>partitionaggregate</code> 會在一批 tuple 的每個分區上執行一個指定的功能操作。與上面的函數不同,由 <code>partitionaggregate</code>發送出的 tuple 會将輸入 tuple 的域替換。以下面這段代碼為例:

假如輸入流中包含有 “a”、“b” 兩個域并且有以下幾個 tuple 塊:

經過上面的代碼之後,輸出就會變成帶有一個名為 “sum” 的域的資料流,其中的 tuple 就是這樣的:

storm 有三個用于定義聚合器的接口:<code>combineraggregator</code>,<code>reduceraggregator</code> 以及 <code>aggregator</code>。

這是 <code>combineraggregator</code> 接口:

<code>combineraggregator</code> 會将帶有一個域的一個單獨的 tuple 傳回作為輸出。<code>combineraggregator</code> 會在每個輸入 tuple 上運作初始化函數,然後使用組合函數來組合所有輸入的值。如果在某個分區中沒有 tuple, <code>combineraggregator</code> 就會輸出<code>zero</code> 方法的結果。例如,下面是 <code>count</code> 的實作代碼:

如果你使用 aggregate 方法來代替 partitionaggregate 方法,你就會發現 <code>combineraggregator</code> 的好處了。在這種情況下,trident 會在發送 tuple 之前通過分區聚合操作來優化計算過程。

<code>reduceraggregator</code> 的接口實作是這樣的:

<code>reduceraggregator</code> 會使用 <code>init</code> 方法來産生一個初始化的值,然後使用該值對每個輸入 tuple 進行周遊,并最終生成并輸出一個單獨的 tuple,這個 tuple 中就包含有我們需要的計算結果值。例如,下面是将 count 定義為 <code>reduceraggregator</code> 的代碼:

<code>reduceraggregator</code> 同樣可以用于 persistentaggregate,你會在後面看到這一點。

最常用的聚合器接口還是下面的 <code>aggregator</code> 接口:

<code>aggregator</code> 聚合器可以生成任意數量的 tuple,這些 tuple 也可以帶有任意數量的域。聚合器可以在執行過程中的任意一點輸出tuple,他們的執行過程是這樣的:

在處理一批資料之前先調用 init 方法。init 方法的傳回值是一個代表着聚合狀态的對象,這個對象接下來會被傳入 aggregate 方法和 complete 方法中。

對于一個區塊中的每個 tuple 都會調用 aggregate 方法。這個方法能夠更新狀态并且有選擇地輸出 tuple。

在區塊中的所有 tuple 都被 aggregate 方法處理之後就會調用 complete 方法。

下面是使用 count 作為聚合器的代碼:

有時你可能會需要同時執行多個聚合操作。這個過程叫做鍊式處理,可以使用下面這樣的代碼來實作:

這段代碼會在每個分區上分别執行 count 和 sum 聚合器,而輸出中隻會包含一個帶有 [“count”, “sum”] 域的單獨的 tuple。

<code>projection</code> 方法隻會保留操作中指定的域。如果你有一個帶有 [“a”, “b”, “c”, “d”] 域的資料流,通過執行這段代碼:

就會使得輸出資料流中隻包含有 [“b”, “d”] 域。

重分區操作會執行一個用來改變在不同的任務間配置設定 tuple 的方式的函數。在重分區的過程中分區的數量也可能會發生變化(例如,重分區之後的并行度就有可能會增大)。重分區會産生一定的網絡資料傳輸。下面是重分區操作的幾個函數:

shuffle:通過随機輪詢算法來重新配置設定目标區塊的所有 tuple。

broadcast:每個 tuple 都會被複制到所有的目标區塊中。這個函數在 drpc 中很有用 —— 比如,你可以使用這個函數來擷取每個區塊資料的查詢結果。

partitionby:該函數會接收一組域作為參數,并根據這些域來進行分區操作。可以通過對這些域進行哈希化,并對目标分區的數量取模的方法來選取目标區塊。partitionby 函數能夠保證來自同一組域的結果總會被發送到相同的目标區間。

global:這種方式下所有的 tuple 都會被發送到同一個目标分區中,而且資料流中的所有的塊都會由這個分區處理。

batchglobal:同一個 batch 塊中的所有 tuple 會被發送到同一個區塊中。當然,在資料流中的不同區塊仍然會配置設定到不同的區塊中。

partition:這個函數使用自定義的分區方法,該方法會實作 <code>backtype.storm.grouping.customstreamgrouping</code> 接口。

trident 使用 aggregate 方法和 persistentaggregate 方法來對資料流進行聚類操作。其中,aggregate 方法會分别對資料流中的每個 batch 進行處理,而 persistentaggregate 方法則會對資料流中的所有 batch 執行聚類處理,并将結果存入某個 state 中。

在資料流上執行 aggregate 方法會執行一個全局的聚類操作。在你使用 <code>reduceraggregator</code> 或者 <code>aggregator</code> 時,資料流首先會被重新分區成一個單獨的分區,然後聚類函數就會在該分區上執行操作。而在你使用 <code>combineraggregator</code> 時,trident 首先會計算每個分區的部分聚類結果,然後将這些結果重分區到一個單獨的分區中,最後在網絡資料傳輸完成之後結束這個聚類過程。<code>combineraggregator</code> 比其他的聚合器的運作效率更高,在聚類時應該盡可能使用<code>combineraggregator</code>。

下面是一個使用 aggregate 來擷取一個 batch 的全局計數值的例子:

與 partitionaggregate 一樣,aggregate 的聚合器也可以進行鍊式處理。然而,如果你在一個處理鍊中同時使用了<code>combineraggregator</code> 和非 <code>combineraggregator</code>,trident 就不能對部分聚類操作進行優化了。

通過對指定的域執行 partitionby 操作,groupby 操作可以将資料流進行重分區,使得相同的域的 tuple 分組可以聚集在一起。例如,下面是一個 groupby 操作的示例:

Apache Storm 官方文檔 —— Trident API 概述

和其他操作一樣,對分組資料流的聚合操作也可以以鍊式的方式執行。

trident api 的最後一部分是聯結不同的資料流的操作。聯結資料流最簡單的方式就是将所有的資料流融合到一個流中。你可以使用 tridenttopology 的 merge 方法實作該操作,比如這樣:

trident 會将融合後的新資料流的域命名為為第一個資料流的輸出域。

聯結資料流的另外一種方法是使用 join。像 sql 那樣的标準 join 操作隻能用于有限的輸入資料集,對于無限的資料集就沒有用武之地了。trident 中的 join 隻會應用于每個從 spout 中輸出的小 batch。

下面是兩個流的 join 操作的示例,其中一個流含有 [“key”, “val1″, “val2″] 域,另外一個流含有 [“x”, “val1″] 域:

上面的例子會使用 “key” 和 “x” 作為 join 的域來聯結 stream1 和 stream2。trident 要求先定義好新流的輸出域,因為輸入流的域可能會覆寫新流的域名。從 join 中輸出的 tuple 中會包含:

join 域的清單。在這個例子裡,輸出的 “key” 域與 stream1 的 “key” 域以及 stream2 的 “x” 域對應。

來自所有流的非 join 域的清單。這個清單是按照傳入 join 方法的流的順序排列的。在這個例子裡,“ a” 和 “b” 域與 stream1 的 “val1” 和 “val2” 域對應;而 “c” 域則與 stream2 的 “val1” 域相對應。

在對不同的 spout 發送出的流進行 join 時,這些 spout 上會按照他們發送 batch 的方式進行同步處理。也就是說,一個進行中的 batch 中含有每個 spout 發送出的 tuple。

到這裡你大概仍然會對如何進行視窗 join 操作感到困惑。視窗操作(包括平滑視窗、滾動視窗等 —— 譯者注)主要是指将目前的 tuple 與過去若幹小時時間段内的 tuple 聯結起來的過程。

你可以使用 partitionpersist 和 statequery 來實作這個過程。過去一段時間内的 tuple 會以 join 域為關鍵字被儲存到一個 state 源中。然後就可以使用 statequery 查詢 join 域來實作這個“聯結”(join)的過程。