在backtype.storm.cluster.clj中, 定義了storm對于zookeeper的使用
首先定義操作zookeeper叢集的interface
實作和生成用于操作zookeeper叢集的record
首先建立zk-client, 并在zk上建立storm-zookeeper-root目錄
接着定義,
callbacks, callback集合
active, 标志zk叢集狀态
zk, zk client
建立zk client的時候, 設定了watcher, 即zk server當狀态發生變化時會給client發送event, 此處client設定的watcher會調用callbacks來處理server發送的event
最後實作clusterstate protocol, 其中register和unregister是用來添加/删除callbacks的, 其他都是些zk的正常操作
定義針對storm定制的zk操作協定, 包含各種storm裡面的資訊在zk上的讀寫
首先判斷是否第一次mk-storm-cluster-state, 既是否進行過zk cluster state的建立, 如果沒有調用mk-distributed-cluster-state
接着, 定義一系列的callbacks, 并調用cluster-state的register, 注冊到callbacks清單中
state-id 就是register傳回的callback的uuid
再者, 在zk上建立storm的子目錄
最後, 實作stormclusterstate協定, 實作各種zk資料的讀寫
通過一個場景來說明storm怎樣使用zookeeper
supervisor中的mk-synchronize-supervisor, 主要用于下載下傳新的, 并删除不使用的topology代碼
是以這個邏輯光執行一次是不夠的, 需要當每次assignment發生變化的時候就執行一次
storm是利用zookeeper的watcher來解決這個問題
1. 在mk-distributed-cluster-state中建立zk client的時候配置watcher, 當收到zk server的event的時候, 調用callbacks清單裡面的callback進行處理
2. 在mk-storm-cluster-state 中将callback加入cluster-state的callback清單
而這個callback本身, 就是根據event中的path(代表哪部分資料發生change)來issue在storm-cluster-state中維護的一系列callback
比如, 當assignments-root發生變化時, 會調用assignments-callback
3. 那麼也就是說隻需要将mk-synchronize-supervisor, set到assignments-callback, 就可以保證當assignments-root發生變化時, 調用mk-synchronize-supervisor去同步topology代碼
什麼時候set? 在第一次調用mk-synchronize-supervisor的時候
同步topology代碼是消耗時間的事情, 是以實作的時候放在背景執行, 隻是将this(function) add到event-manager的queue裡面, 背景線程會執行這個函數
并且在調用assignment擷取assignments-snapshot的時候, 将sync-callback set到assignments-callback中去
by the way, 對于get-children, 是否有callback, 即是否被watch, 讀的資料是不一樣的, 具體原因不是很清楚, 需要後面看看zk的具體使用
4. 前面說了issue-callback!在執行assignments-callback之前, 會将其清空, 是以如果需要不斷的觸發, 那麼就要不斷的設定assignments-callback
是以作為callback, mk-synchronize-supervisor會先通過assignments-snapshot去重設assignments-callback
至于為什麼要采用這樣的機制? 現在還看不清楚
本文章摘自部落格園,原文釋出日期:2013-06-26