天天看點

Storm入門之第五章Bolts第5章 Bolts

Storm入門之第五章Bolts第5章 Bolts

正如你已經看到的,bolts是一個storm叢集中的關鍵元件。你将在這一章學到bolt生命周期,一些bolt設計政策,以及幾個有關這些内容的例子。

bolt是這樣一種元件,它把元組作為輸入,然後産生新的元組作為輸出。實作一個bolt時,通常需要實作irichbolt接口。bolts對象由用戶端機器建立,序列化為拓撲,并送出給叢集中的主機。然後叢集啟動勞工程序反序列化bolt,調用prepare,最後開始處理元組。

note:要建立一個bolt對象,它通過構造器參數初始化成員屬性,bolt被送出到叢集時,這些屬性值會随着一起序列化。

bolts擁有如下方法:

下面看一個例子,在這個例子中bolt把一句話分割成單詞清單:

正如你所看到的,這是一個很簡單的bolt。值得一提的是在這個例子裡,沒有消息擔保。這就意味着,如果bolt因為某些原因丢棄了一些消息——不論是因為bolt挂了,還是因為程式故意丢棄的——生成這條消息的spout不會收到任何通知,任何其它的spouts和bolts也不會收到。

然而在許多情況下,你想確定消息在整個拓撲範圍内都被處理過了。

正如前面所說的,storm保證通過spout發送的每條消息會得到所有bolt的全面處理。基于設計上的考慮,這意味着你要自己決定你的bolts是否保證這一點。

拓撲是一個樹型結構,消息(元組)穿過其中一條或多條分支。樹上的每個節點都會調用ack(tuple)或fail(tuple),storm是以知道一條消息是否失敗了,并通知那個/那些制造了這些消息的spout(s)。既然一個storm拓撲運作在高度并行化的環境裡,跟蹤始發spout執行個體的最好方法就是在消息元組内包含一個始發spout引用。這一技巧稱做錨定(譯者注:原文為anchoring)。修改一下剛剛講過的splitsentence,使它能夠確定消息都被處理了。

錨定發生在調用collector.emit()時。正如前面提到的,storm可以沿着元組追蹤到始發spout。collector.ack(tuple)和collector.fail(tuple)會告知spout每條消息都發生了什麼。當樹上的每條消息都已被處理了,storm就認為來自spout的元組被全面的處理了。如果一個元組沒有在設定的逾時時間内完成對消息樹的處理,就認為這個元組處理失敗。預設逾時時間為30秒。

note:你可以通過修改config.topology_message_timeout修改拓撲的逾時時間。

當然了spout需要考慮消息的失敗情況,并相應的重試或丢棄消息。

note:你處理的每條消息要麼是确認的(譯者注:collector.ack())要麼是失敗的(譯者注:collector.fail())。storm使用記憶體跟蹤每個元組,是以如果你不調用這兩個方法,該任務最終将耗盡記憶體。

一個bolt可以使用emit(streamid, tuple)把元組分發到多個流,其中參數streamid是一個用來辨別流的字元串。然後,你可以在topologybuilder決定由哪個流訂閱它。

為了用bolt連接配接或聚合資料流,你需要借助記憶體緩沖元組。為了在這一場景下確定消息完成,你不得不把流錨定到多個元組上。可以向emit方法傳入一個元組清單來達成目的。

通過這種方式,bolt在任意時刻調用ack或fail方法,都會通知消息樹,而且由于流錨定了多個元組,所有相關的spout都會收到通知。

你可能已經注意到了,在許多情況下都需要消息确認。簡單起見,storm提供了另一個用來實作bolt的接口,ibasicbolt。對于該接口的實作類的對象,會在執行execute方法之後自動調用ack方法。

note:分發消息的basicoutputcollector自動錨定到作為參數傳入的元組。

繼續閱讀