天天看點

Storm-Trident APITrident API

《從零開始學Storm》

apachecn/storm-doc-zh

Trident是在Storm的基礎上,一個以實時計算為目标的 high-level abstraction (高度抽象)。 它在提供處理大吞吐量資料能力(每秒百萬次消息)的同時, 也提供了低延時分布式查詢和 stateful stream processing (有狀态流式處理)的能力。

Trident 提供了

joins (連接配接), aggregations(聚合), grouping(分組), functions, 以及 filters 等

能力… 除此之外, Trident 還提供了一些專門的 primitives (原語), 進而在基于資料庫或者其他存儲的前提下來應付有狀态的遞增式處理. Trident 也提供

一緻性(consistent)、有且僅有一次(exactly-once)

等語義。

Trident API

“Stream” 是 Trident 中的核心資料模型, 它被當做一系列的 batch 來處理.在 Storm 叢集的節點之間, 一個 stream 被劃分成很多 partition (分區), 對Stream(流)的

operation (操作)

是在每個 partition 上并行進行的.

  1. 也有說法:TridentTuple 是 Trident 中的核心資料模型。
  2. 一個 stream 被劃分成很多 partition : partition 是 stream 的一個子集, 裡面可能有多個 batch ,

    一個 batch 也可能位于不同的 partition

    上.

Trident 有 5 類操作:

  1. Partition-local operations(本地分區操作): 對每個 partition 的局部操作,

    不産生網絡傳輸

  2. Repartitioning operations(重分區操作): 對 stream (資料流)的重新劃分(僅僅是劃分, 但不改變内容),

    産生網絡傳輸

  3. Aggregation operations (聚合操作):部分進行網絡傳輸的 .
  4. Operations on grouped streams (流分組操作):
  5. Merges & joins(合并和連接配接操作):

    類似SQL中的join

本地分區操作

Partition-local operations (分區本地操作)不涉及網絡傳輸, 并且獨立地應用于每個 batch partition (batch分區).

Function(函數)

一個 function 收到一個輸入 tuple 後可以

輸出 0 或多個 tuple

, 輸出 tuple 的字段被

追加

到接收到的輸入 tuple 後面.如果對某個 tuple 執行 function 後沒有輸出 tuple, 則該 tuple 被 filter(過濾), 否則, 就會為每個輸出 tuple 複制一份輸入 tuple 的副本.

例:假設有如下的 function,

public class MyFunction extends BaseFunction {
		@Override
		public void execute(TridentTuple tuple, TridentCollector collector) {
			//如果tuple.getInteger(0) > 1 ,會将一個tuple變為多條
			for(int i=0; i < tuple.getInteger(0); i++) {
				collector.emit(new Values(i));
			}
		}
	}
           

執行如下代碼,

通過輸入元組為Fields("b")計算,追加的Fields("d")

public void a_function2(){
		/**
		 *   before             after
		 * [1, 2, 3]        [1, 2, 3, 0]
		 *            ==>   [1, 2, 3, 1]
		 * [4, 1, 6]        [4, 1, 6, 0]
		 * [3, 0, 8]        過濾掉.......
		 */
		dummyStream.each(new Fields("b"), new MyFunction(), new Fields("d"));
	}
           

其中,

[1, 2, 3] 輸出為2條tuple,[3, 0, 8] 則被過濾掉。

Filter(過濾器)

Filters 收到一個輸入 tuple , 并決定是否保留該 tuple。

例:假設有如下的 filter:

僅保留 number0 + number1 為偶數的tuple.

class CheckEvenSumFilter extends BaseFilter{
		@Override
		public boolean isKeep(TridentTuple tuple) {
			int number0 = tuple.getInteger(0);
			int number1 = tuple.getInteger(1);
			int sum = number0 + number1;
			return sum % 2 == 0;
		}
	}
           

執行如下代碼:

public void b_filter(){
		/**
		 *  before                      after
		 *  [1,1,3,11]     ====>    [1,1,3,11]
		 *  [2,2,7,1]               	  [2,2,7,1]
	     *  [2,3,4,5]               	  過濾剔除.... 
		 */
		dummyStream.each(new Fields("a", "b"), new CheckEvenSumFilter());
	}
           

其中tuple:

[2,3,4,5] 被過濾剔除.

map and flatMap

map

map 傳回一個 stream , 它包含将給定的 mapping function (映射函數)應用到 stream 的 tuples 的結果. 這個可以用來對 tuples 應用

one-one transformation (一一變換

).

例:如果有一個sentences流,你想将它轉換為大寫語句,可以使用如下 map函數

class UpperCase implements MapFunction {
		@Override
		public Values execute(TridentTuple input) {
			return new Values(input.getString(0).toUpperCase());
		}
	}
           

執行如下代碼:

public void c_map(){
		dummyStream.map(new UpperCase());

		//用新的輸出字段名稱替換舊的字段 === 相當于 fields-rename, fields個數不變
		dummyStream.map(new UpperCase(), new Fields("uppercased"));
	}
           

注意,

第2個字段Fields outputFields可選。

flatMap

flatMap 類似于 map , 但具有将

one-to-many transformation (一對多變換)

應用于 values of the stream (流的值)的效果, 然後将所得到的元素 flattening (平坦化)為新的 stream .

例: 如果有sentences 流, 并且您想将其轉換成 words流,可以使用如下 flatmap函數,

class Split implements FlatMapFunction {

		@Override
		public Iterable<Values> execute(TridentTuple input) {
			List<Values> valuesList = new ArrayList<>();
			//根據“ ”,将sentences 拆分為 words
			for (String word : input.getString(0).split(" ")) {
				valuesList.add(new Values(word));
			}
			return valuesList;
		}
	}
           

執行如下代碼:

public void d_flatmap(){
		dummyStream.flatMap(new Split());

		dummyStream.flatMap(new Split(), new Fields("word"));
	}

           

peek

peek 可用于在每個 trident tuple 流過 stream 時對其執行 additional action (附加操作). 這可能對于在流經 pipeline 中某一點的元組來 debugging (調試) tuples 是有用的.

dummyStream.a.b.c.peek(new Consumer() {
	@Override
		public void accept(TridentTuple input) {
			//列印TridentTuple 資訊
			System.out.println(input.getString(0));
		}
	});
           

min and minBy

min 和 minBy operations (操作)在 trident stream 中的 a batch of tuples (一批元組)的

每個 partition (分區)

上傳回 minimum value (最小值).

它的api如下:

//1 min方法:需要顯示的聲明Comparator
public Stream min(Comparator<TridentTuple> comparator) {...}

//2.1 minBy:通過指定具體字段來比較
public Stream minBy(String inputFieldName)  {...}

//2.2 minBy:通過指定具體字段 & 顯示聲明Comparator
 public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) {...}
           

max and maxBy

max 和 maxBy operations (操作)在 trident stream 中的一 batch of tuples (批元組)的

每個 partition (分區)上傳回 maximum (最大值)

.,它的api方法:

public Stream max(Comparator<TridentTuple> comparator) {...}
public Stream maxBy(String inputFieldName)  {...}
 public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) {...}
           

partitionAggregate分區聚合

partitionAggregate是運作在每個batch元組partition(分區)上的Function(函數),又不同于上面的 functions 操作, partitionAggregate 的輸出 tuple 将會

取代

收到的輸入 tuple 。

例:假設 input stream 包括字段 [“a”, “b”] , 并有下面的 partitions of tuples (元組 partitions ):

Partition 0:
["a", 1]
["b", 2]

Partition 1:
["a", 3]
["c", 8]

Partition 2:
["e", 1]
["d", 9]
["d", 10]
           

執行如下代碼,對輸入元組b字段進行求和:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

則輸出流

隻包含sum字段

,即求和結果:

Partition 0:
[3]

Partition 1:
[11]

Partition 2:
[20]
           

上面代碼中的 new Sum() 實際上是一個

aggregator (聚合器)

, 定義一個聚合器有三種不同的接口:

  • CombinerAggregator
  • ReducerAggregator
  • Aggregator .

CombinerAggregator

CombinerAggregator

接口定義如下:

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}
           

一個 CombinerAggregator 僅

輸出一個 tuple(該 tuple 也隻有一個字段)

,

  • 每收到一個輸入 tuple, CombinerAggregator 就會執行 init() 方法(該方法傳回一個初始值)
  • 調用 combine() 方法彙總這些值, 直到剩下一個值為止(聚合值)
  • .如果 partition 中沒有 tuple, CombinerAggregator 會發送 zero() 的傳回值.
// 聚合器 Count  vs Sum
public class Count implements CombinerAggregator<Long> {

    @Override
    public Long init(TridentTuple tuple) {
    	//count初始值為1
        return 1L;
    }

    @Override
    public Long combine(Long val1, Long val2) {
        return val1 + val2;
    }

    @Override
    public Long zero() {
        return 0L;
    }
    
}

public class Sum implements CombinerAggregator<Number> {

    @Override
    public Number init(TridentTuple tuple) {
     	//sum初始值為0
        return (Number) tuple.getValue(0);
    }

    @Override
    public Number combine(Number val1, Number val2) {
        return Numbers.add(val1, val2);
    }

    @Override
    public Number zero() {
        return 0;
    }
    
}
           
當使用

aggregate()

方法代替

partitionAggregate()

方法時, 就能看到 CombinerAggregation 帶來的好處.這種情況下, Trident 會自動優化計算:

先做局部聚合操作, 然後再通過網絡傳輸 tuple 進行全局聚合

.

ReducerAggregator

ReducerAggregator

接口如下:

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}
           

ReducerAggregator 使用 init() 方法産生一個初始值, 對于每個輸入 tuple , 依次疊代這個初始值, 最終産生一個單值輸出 tuple。

ReducerAggregator 也可以與

persistentAggregate

一起使用, 後續會有涉及.

Aggregator

聚合

最通用的接口

Aggregator

,Aggregator的接口定義如下:

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}

           

Aggregator 可以輸

出任意數量的 tuple , 且這些 tuple 的字段也可以有多個

。執行過程中的

任何時候都可以輸出 tuple (三個方法的參數中都有 collector )

. . Aggregator 的執行方式如下:

  • init方法

    : 處理每個 batch 之前調用一次 init() 方法。init的傳回值是一個表示

    聚合狀态

    的對象,該對象會傳遞給後續的aggregate和complete方法。

!!!在實際的實踐過程中發現, init()在batch partition第一次處理時被調用。

  • aggregate方法

    :每個收到一個該 batch 中的輸入

    tuple

    就會調用一次 aggregate, 該方法中可以

    更新狀态

    ,并

    可選地

    發出元組.
  • complete方法

    : 當batch partition的所有tuple都已經被

    aggregate方法

    處理後被調用。

例:下面的代碼将 Count 作為 Aggregator 實作:

public class CountAgg extends BaseAggregator<CountState> {
    static class CountState {
        long count = 0;
    }

    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        state.count+=1;
    }

    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }
}
           

aggregators 鍊式用法

有時需要同時執行

multiple aggregators (多個聚合)操作

, 這個可以使用 chaining (鍊式)操作完成:

mystream.chainedAgg()
	        .partitionAggregate(new Count(), new Fields("count"))
	        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
	        .chainEnd()
           

這段代碼将會對每個 partition 執行 Count 和 Sum aggregators (聚合器), 并輸出一個tuple 字段 [“count”, “sum”].

stateQuery(狀态查詢) and partitionPersist(狀态持久化)

stateQuery 和 partitionPersist 分别 query (查詢)和 update (更新) sources of state (狀态源)

projection(投影)

經 Stream 中的 project 方法處理後的 tuple 僅保持指定字段(相當于過濾字段)

public void z_projections(){
		//假設目前dummyStream有元組字段 [x,y ,z], 指定x後, 隻保留[x]
		/**
		 *  before               after
		 *  [1,4,7]     ====>    [1]
		 *  [4,1,3]              [4]
		 */
		dummyStream.project(new Fields("x"));
	}
           

重分區操作

Repartitioning operations (重新分區操作)

運作一個函數來更改元組在任務之間的分區。

分區的數量可以通過重分區操作而改變,重分區操作

需要網絡傳輸

,以下是充分去的方法:

  • shuffle

    : 随機将 tuple 均勻地分發到目标 partition 裡.
  • broadcast

    : 每個 tuple 被複制到所有的目标 partition 裡,

    這在 DRPC 中非常有用,例如需要對每個分區的資料做一個stateQuery操作。

  • partitionBy

    :對每個 tuple 選擇 partition 的方法是:(該 tuple 指定字段的 hash 值) mod (目标 partition 的個數), 該方法

    確定指定字段相同

    的 tuple 能夠被發送到同一個 partition .

    但同一個 partition 裡可能有字段不同的 tuple(因為mod後的值相同)

    .
  • global

    : 所有的 tuple 都被發送到同一個 partition
  • batchGlobal

    : 確定同一個 batch 中的 tuple 被發送到相同的 partition 中.
  • partition

    : 此方法采用實作

    org.apache.storm.grouping.CustomStreamGrouping

    的自定義分區函數.

聚合操作

Trident 中有 aggregate() 和 persistentAggregate() 方法對流進行聚合操作。

  • aggregate()

    :在每個 batch 上獨立的執行,
  • persistemAggregate()

    : 對所有 batch 中的所有 tuple 進行聚合, 并将結果存入 state 源中.

aggregate() 對 Stream 做全局聚合時的兩種方式:

  1. 當使用 ReduceAggregator 或者 Aggregator 聚合器時

    :流先被重新劃分成一個大分區(僅有一個 partition ), 然後對這個 partition 做聚合操作。
  2. 當使用 CombinerAggregator 時

    :Trident 首先對每個 partition 局部聚合, 然後将所有這些 partition 重新劃分到一個 partition 中, 完成全局聚合.

    CombinerAggregator 更高效, 推薦使用

    .

例:使用 aggregate() 對一個 batch 操作得到一個全局的 count

mystream.aggregate(new Count(), new Fields("count"));
           

流分組操作

groupBy 操作:

  • 首先對流中的指定字段做 partitionBy 操作, 讓指定字段相同的 tuple 能被發送到同一個 partition 裡.
  • 然後在每個 partition 裡根據指定字段值對該分區裡的 tuple 進行分組.

下面示範了 groupBy 操作的過程:

Storm-Trident APITrident API
  • 如果你在一個 grouped stream 上做聚合操作, 聚合操作将會在

    每個 group (分組)内進行, 而不是整個 batch 上

    .
  • GroupStream 類中也有 persistentAggregate 方法, 該方法聚合的結果将會存儲在一個

    key 值為分組字段(即 groupBy 中指定的字段)

    的 MapState 中.
  • 普通的 stream 一樣, groupstream 上的聚合操作也可以使用 chained (鍊式文法)

合并與連接配接

merge

将幾個 stream 彙總到一起, 最簡單的彙總方法是将他們合并成一個 stream , 這個可以通過 TridentTopology 中的 merge 方法完成, 就像這樣:

topology.merge(stream1, stream2, stream3);
           

Trident會重命名新的輸出字段,合并流并

以第一個流的 output fields (輸出字段)來命名

join

合并流的另一種方法是

join(連接配接,類似sql中的join)

,它 需要

有限的輸入

,針對無限流是沒有意義的。

join僅适用于從 spout 發出的每個 small batch 中.

例:以下是包含字段 [“key”, “val1”, “val2”] 的 stream 和包含 [“x”, “val1”] 的另一個 stream 之間的 join 示例:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
           

stream1中key字段和stream2中x字段進行連接配接。

另外Trident要求所有新流的輸出字段被重命名,因為各個輸入流之間可能會存在重複的字段名稱。從 join 發出的 tuples 将包含:

  • list of join fields (連接配接字段清單).在這種情況下,

    "key" 對應于 stream1 的 "key" , stream2 對應于 "x" .

  • 接下來, 按照 streams 如何傳遞到 join 方法的順序, 所有流中的

    所有非連接配接字段的清單

    .在這種情況下, “a” 和 “b” 對應于來自 stream1 的 “val1” 和 “val2” , “c” 對應于來自 stream2 的 “val1” .