天天看點

Storm-源碼分析- Storm中Zookeeper的使用

在backtype.storm.cluster.clj中, 定義了storm對于zookeeper的使用

首先定義操作zookeeper叢集的interface

實作和生成用于操作zookeeper叢集的record 

首先建立zk-client, 并在zk上建立storm-zookeeper-root目錄 

接着定義, 

    callbacks, callback集合 

    active, 标志zk叢集狀态 

    zk, zk client

建立zk client的時候, 設定了watcher, 即zk server當狀态發生變化時會給client發送event, 此處client設定的watcher會調用callbacks來處理server發送的event

最後實作clusterstate protocol, 其中register和unregister是用來添加/删除callbacks的, 其他都是些zk的正常操作

定義針對storm定制的zk操作協定, 包含各種storm裡面的資訊在zk上的讀寫

首先判斷是否第一次mk-storm-cluster-state, 既是否進行過zk cluster state的建立, 如果沒有調用mk-distributed-cluster-state 

接着, 定義一系列的callbacks, 并調用cluster-state的register, 注冊到callbacks清單中 

       state-id 就是register傳回的callback的uuid 

再者, 在zk上建立storm的子目錄 

最後, 實作stormclusterstate協定, 實作各種zk資料的讀寫

通過一個場景來說明storm怎樣使用zookeeper

supervisor中的mk-synchronize-supervisor, 主要用于下載下傳新的, 并删除不使用的topology代碼 

是以這個邏輯光執行一次是不夠的, 需要當每次assignment發生變化的時候就執行一次

storm是利用zookeeper的watcher來解決這個問題 

1. 在mk-distributed-cluster-state中建立zk client的時候配置watcher, 當收到zk server的event的時候, 調用callbacks清單裡面的callback進行處理

2. 在mk-storm-cluster-state 中将callback加入cluster-state的callback清單 

    而這個callback本身, 就是根據event中的path(代表哪部分資料發生change)來issue在storm-cluster-state中維護的一系列callback 

    比如, 當assignments-root發生變化時, 會調用assignments-callback  

3. 那麼也就是說隻需要将mk-synchronize-supervisor, set到assignments-callback, 就可以保證當assignments-root發生變化時, 調用mk-synchronize-supervisor去同步topology代碼 

   什麼時候set? 在第一次調用mk-synchronize-supervisor的時候 

   同步topology代碼是消耗時間的事情, 是以實作的時候放在背景執行, 隻是将this(function) add到event-manager的queue裡面, 背景線程會執行這個函數 

   并且在調用assignment擷取assignments-snapshot的時候, 将sync-callback set到assignments-callback中去

by the way, 對于get-children, 是否有callback, 即是否被watch, 讀的資料是不一樣的, 具體原因不是很清楚, 需要後面看看zk的具體使用

4. 前面說了issue-callback!在執行assignments-callback之前, 會将其清空, 是以如果需要不斷的觸發, 那麼就要不斷的設定assignments-callback 

    是以作為callback, mk-synchronize-supervisor會先通過assignments-snapshot去重設assignments-callback 

    至于為什麼要采用這樣的機制? 現在還看不清楚

本文章摘自部落格園,原文釋出日期:2013-06-26 

繼續閱讀