Jstorm是參考storm的實時流式計算架構,在網絡IO、線程模型、資源排程、可用性及穩定性上做了持續改進,已被越來越多企業使用
作為commiter和user,我還是非常看好它的應用前景,下面是在團隊内的分享介紹,更多請參考https://github.com/alibaba/jstorm
一、jstorm是什麼
jstorm可以看作是storm的java增強版本,除了核心用純java實作外,還包括了thrift、python、facet ui。從架構上看,其本質是一個基于zk的分布式排程系統
Jstorm主要應用場景有:
1.資訊流處理,如聚合、分析等
2.持續計算,如實時資料統計、監控
3.分布式rpc調用
Jstorm在核心上對storm的改進有:
(1)模型簡化
(2)多元度資源排程
(3)網絡通信層改造
(4)采樣重構
(5)worker/task内部異步化處理
(6)classload、HA
模型簡化将storm的三層管理模型簡化為兩層
jstorm中task直接對應了線程概念,而在storm中是task隻是線程executor的一個執行邏輯單元
多元度資源排程 分為cpu、memory、net、disk四個次元,預設情況下:
cpu slots = 機器核數 * 2 -1
memory slots = 機器實體記憶體 / 1024M
net slots = min(cpu slots, memory slots)
網絡通信層 采用了netty + disruptor 替換 zmq + blockingQueue
采樣重構
a.定義了滾動時間視窗
b.優化緩存map性能
c.增量采樣時間以及減少無謂資料
Worker/Task内部異步化
異步化和回調是流式架構最基本的兩大特征,Jstorm在task的計算中将nextTuple和ack/fail的邏輯分離開來,并在worker中采用單獨線程負責流入、流出資料的反序列化及序列化工作
有關jstorm實作的幾個關鍵流程,有興趣的可以參考源碼
1.Nimbus的啟動
2.supervisor的啟動
3. worker内部結構
worker的啟動需要完成以下幾件事:
a.讀取配置檔案,啟動程序
b.初始化tuple接收隊列和發送隊列
c.打開端口,啟動rpc服務
d.建立context結構,<component, <stream, output_field>>
e.觸發各種timer,refresh/reconnection/heartbeat...
task的工作包括:
a.建立内部隊列,bind connection
b.反射component拿到taskObj,建立具體的spout/bolt executor
c.反序列化tuple資料,執行處理邏輯
d.做stats,heartbeat等
jstorm在資料的完整性和準确性上分别依賴了acker和事務機制
acker本質是獨立的bolt,input是fieldGrouping,output是directGrouping;
每個bolt有兩個output stream(ACKER_ACK_STREAM_ID/ACKER_ACK_FAIL_STREAM_ID)
每個spout有一個output stream(ACKER_INIT_STREAM_ID),以及兩個input stream(ACKER_ACK_STREAM_ID/ACKER_ACK_FAIL_STREAM_ID)
Spout
發送給acker 的value <rootid, xor(target_task_list)>
發送下一級bolt 的value <rootid, 目标taskid>
Bolt
下一級bolt需要ack發送給下一級bolt 為<rootid, 新uuid)>發送給acker的value為<rootid, xor(新uuid, $(接收值))>
下一級bolt不需要ack發送給下一級bolt 為空發送給acker為<rootid, $(接收值)>
事務:批處理+全局唯一遞增id+兩階段送出
在發送tuple的時候帶上tid來保證“隻有一次”的原語,下遊邏輯根據tid是否next tid來判斷是否需要處理。為了提高效率,會将多個tuple組裝成一批賦予一個tid,并用pipeline方式執行processing和commit階段,其中processing可以并發執行,而commit具有嚴格的強順序性。接口coordinator,commitor中做了狀态管理、事務協調、錯誤檢查等工作
另外一個用得最多的進階特性就是trident,它對bolt進行了封裝,提供了如joins、aggregations、grouping、filters、function等多種進階資料處理能力
最後,談談有關jstorm的運維開發
(1)配置優先級:代碼 > jstorm.yaml > default.yaml
(2)stream流對比:
a.fieldsGrouping
b.globalGrouping - target componet的第一個task
c.shuffleGrouping - 自定義random,更平均
d.noneGrouping - 調用random
e.allGrouping - target component所有task
f.directGrouping - 指定目标task
g.customGrouping - 接口customStreamGrouping
(3)jvm調優,優先考慮新生代,開啟碎片整理
(4)同一worker内的task,開啟定向排程避免網絡開銷
(5)優雅關閉,reblance或kill前先deactive,等待msg_timeout進行資料清理
(6)其它,hooks、queue-size、topology.max.spout.pending等