[b]開源用戶端,原生api的不足[/b]
連接配接的建立是異步的,需要開發人員自行編碼實作等待
連接配接沒有自動的逾時重連機制
Zk本身沒提供序列化機制,需要開發人員自行指定,進而實作資料的序列化和反序列化
Watcher注冊一次隻會生效一次,需要不斷的重複注冊
Watcher的使用方式不符合java本身的術語,如果采用監聽器方式,更容易了解
不支援遞歸建立樹形節點
[b]開源用戶端---ZkClient介紹[/b]
Github上一個開源的zk用戶端,由datameer的工程師Stefan Groschupf和Peter Voss一起開發
– 解決session會話逾時重連
– Watcher反複注冊
– 簡化開發api
– 還有.....
– https://github.com/sgroschupf/zkclient
[b]開源用戶端---Curator介紹[/b]
1. 使用CuratorFrameworkFactory工廠的兩個靜态方法建立用戶端
a) static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs,
RetryPolicy retryPolicy)
b) static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
2. Start()方法啟動
參數說明
connectString 分開的ip:port對
retryPolicy 重試政策,預設四種:Exponential BackoffRetry,RetryNTimes ,RetryOneTime,
RetryUntilElapsed
sessionTimeoutMs 會話逾時時間,機關為毫秒,預設60000ms
connectionTimeoutMs 連接配接建立逾時時間,機關為毫秒,預設是15000ms
[b]重試政策[/b]
– 實作接口RetryPolicy可以自定重重試政策
• boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
retryCount 已經重試的次數,如果第一次重試 此值為0
elapsedTimeMs 重試花費的時間,機關為毫秒
sleeper 類似于Thread.sleep,用于sleep指定時間
傳回值 如果還會繼續重試,則傳回true
四種預設重試政策
– ExponentialBackoffRetry
• ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
• ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
• 目前應該sleep的時間: baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))),随着重試次數增加重試時間間隔變大,指數倍增長
參數說明
baseSleepTimeMs 初始sleep時間
maxRetries 最大重試次數
maxSleepMs 最大重試時間
傳回值 如果還會繼續重試,則傳回true
預設重試政策
– RetryNTimes
• RetryNTimes(int n, int sleepMsBetweenRetries)
• 目前應該sleep的時間
參數說明
n 最大重試次數
sleepMsBetweenRetries 每次重試的間隔時間
– RetryOneTime
• 隻重試一次
• RetryOneTime(int sleepMsBetweenRetry), sleepMsBetweenRetry為重試間隔的時間
預設重試政策
– RetryUntilElapsed
• RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
• 重試的時間超過最大時間後,就不再重試
參數說明
maxElapsedTimeMs 最大重試時間
sleepMsBetweenRetries 每次重試的間隔時間
[b]Fluent風格的API[/b]
– 定義:一種面向對象的開發方式,目的是提高代碼的可讀性
– 實作方式:通過方法的級聯或者方法鍊的方式實作
– 舉例:
zkclient = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("tests").build();
[b]建立節點[/b]
– 建構操作包裝類(Builder): CreateBuilder create()---- CuratorFramework
– CreateBuilder
• creatingParentsIfNeeded() //遞歸建立父目錄
• withMode(CreateMode mode)//設定節點屬性,比如:CreateMode.PERSISTENT,如果是遞歸建立,建立模式
為臨時節點,則隻有葉子節點是臨時節點,非葉子節點都為持久節點
• withACL(List aclList) //設定acl
• forPath(String path) //指定路勁
[b]删除節點[/b]
– 建構操作包裝類(Builder):DeleteBuilder delete() -----CuratorFramework
– DeleteBuilder
• withVersion(int version) //特定版本号
• guaranteed() //確定節點被删除
• forPath(String path) //指定路徑
• deletingChildrenIfNeeded() //遞歸删除所有子節點
[b]關于guaranteed:[/b]
Solves edge cases where an operation may succeed on the server but connection failure
occurs before a response can be successfully returned to the client
意思是:解決當某個删除操作在伺服器端可能成功,但是此時用戶端與伺服器端的連接配接中斷,而删除的響
應沒有成功傳回到用戶端
底層的本質是重試
[b]關于異步操作[/b]
– inBackground()
– inBackground(Object context)
– inBackground(BackgroundCallback callback)
– inBackground(BackgroundCallback callback, Object context)
– inBackground(BackgroundCallback callback, Executor executor)
– inBackground(BackgroundCallback callback, Object context, Executor executor)
從參數看跟zk的原生異步api相同,多了一個線程池,用于執行回調
[b]讀取資料[/b]
– 建構操作包裝類(Builder): GetDataBuilder getData() -----CuratorFramework
– GetDataBuilder
• storingStatIn(org.apache.zookeeper.data.Stat stat) //把伺服器端擷取的狀态資料存儲到stat對象
• Byte[] forPath(String path)//節點路徑
讀取子節點
– 建構操作包裝類(Builder): GetChildrenBuilder getChildren() -----CuratorFramework
– GetChildrenBuilder
• storingStatIn(org.apache.zookeeper.data.Stat stat) //把伺服器端擷取的狀态資料存儲到stat對象
• Byte[] forPath(String path)//節點路徑
• usingWatcher(org.apache.zookeeper.Watcher watcher) //設定watcher,類似于zk本身的api,也隻能使用一次
• usingWatcher(CuratorWatcher watcher) //設定watcher ,類似于zk本身的api,也隻能使用一次
[b]設定watcher[/b]
– NodeCache
• 監聽資料節點的内容變更
• 監聽節點的建立,即如果指定的節點不存在,則節點建立後,會觸發這個監聽
– PathChildrenCache
• 監聽指定節點的子節點變化情況
• 包括:新增子節點 子節點資料變更 和子節點删除
NodeCache
– 構造函數
• NodeCache(CuratorFramework client, String path)
• NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
參數說明
client 用戶端執行個體
path 資料節點路徑
dataIsCompressed 是否進行資料壓縮
– 回調接口
• public interface NodeCacheListener
void nodeChanged() //沒有參數,怎麼擷取事件資訊以及節點資料?
PathChildrenCache
client 用戶端執行個體
path 資料節點路徑
dataIsCompressed 是否進行資料壓縮
cacheData 用于配置是否把節點内容緩存起來,如果配置為true,那麼用戶端在接
收到節點清單變更的同時,也能夠擷取到節點的資料内容,如果為false
則無法取到資料内容
threadFactory 通過這兩個參數構造專門的線程池來處理事件通知
executorService
PathChildrenCache
– 監聽接口
• 時間類型包括:新增子節點(CHILD_ADDED),子節點資料變更(CHILD_UPDATED),子節點删除(CHILD_REMOVED)
– PathChildrenCache.StartMode
• BUILD_INITIAL_CACHE //同步初始化用戶端的cache,及建立cache後,就從伺服器端拉入對應的資料
• NORMAL //異步初始化cache
• POST_INITIALIZED_EVENT //異步初始化,初始化完成觸發事件PathChildrenCacheEvent.Type.INITIALIZED
zkclient舉例
curator舉例