天天看點

zookeeper學習筆記-zkclient,curator使用

[b]開源用戶端,原生api的不足[/b]

連接配接的建立是異步的,需要開發人員自行編碼實作等待

連接配接沒有自動的逾時重連機制

Zk本身沒提供序列化機制,需要開發人員自行指定,進而實作資料的序列化和反序列化

Watcher注冊一次隻會生效一次,需要不斷的重複注冊

Watcher的使用方式不符合java本身的術語,如果采用監聽器方式,更容易了解

不支援遞歸建立樹形節點

[b]開源用戶端---ZkClient介紹[/b]

Github上一個開源的zk用戶端,由datameer的工程師Stefan Groschupf和Peter Voss一起開發

– 解決session會話逾時重連

– Watcher反複注冊

– 簡化開發api

– 還有.....

– https://github.com/sgroschupf/zkclient

[b]開源用戶端---Curator介紹[/b]

1. 使用CuratorFrameworkFactory工廠的兩個靜态方法建立用戶端

a) static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs,

RetryPolicy retryPolicy)

b) static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)

2. Start()方法啟動

參數說明

connectString 分開的ip:port對

retryPolicy 重試政策,預設四種:Exponential BackoffRetry,RetryNTimes ,RetryOneTime,

RetryUntilElapsed

sessionTimeoutMs 會話逾時時間,機關為毫秒,預設60000ms

connectionTimeoutMs 連接配接建立逾時時間,機關為毫秒,預設是15000ms

[b]重試政策[/b]

– 實作接口RetryPolicy可以自定重重試政策

• boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)

retryCount 已經重試的次數,如果第一次重試 此值為0

elapsedTimeMs 重試花費的時間,機關為毫秒

sleeper 類似于Thread.sleep,用于sleep指定時間

傳回值 如果還會繼續重試,則傳回true

四種預設重試政策

– ExponentialBackoffRetry

• ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)

• ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

• 目前應該sleep的時間: baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))),随着重試次數增加重試時間間隔變大,指數倍增長

參數說明

baseSleepTimeMs 初始sleep時間

maxRetries 最大重試次數

maxSleepMs 最大重試時間

傳回值 如果還會繼續重試,則傳回true

預設重試政策

– RetryNTimes

• RetryNTimes(int n, int sleepMsBetweenRetries)

• 目前應該sleep的時間

參數說明

n 最大重試次數

sleepMsBetweenRetries 每次重試的間隔時間

– RetryOneTime

• 隻重試一次

• RetryOneTime(int sleepMsBetweenRetry), sleepMsBetweenRetry為重試間隔的時間

預設重試政策

– RetryUntilElapsed

• RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

• 重試的時間超過最大時間後,就不再重試

參數說明

maxElapsedTimeMs 最大重試時間

sleepMsBetweenRetries 每次重試的間隔時間

[b]Fluent風格的API[/b]

– 定義:一種面向對象的開發方式,目的是提高代碼的可讀性

– 實作方式:通過方法的級聯或者方法鍊的方式實作

– 舉例:

zkclient = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("tests").build();

[b]建立節點[/b]

– 建構操作包裝類(Builder): CreateBuilder create()---- CuratorFramework

– CreateBuilder

• creatingParentsIfNeeded() //遞歸建立父目錄

• withMode(CreateMode mode)//設定節點屬性,比如:CreateMode.PERSISTENT,如果是遞歸建立,建立模式

為臨時節點,則隻有葉子節點是臨時節點,非葉子節點都為持久節點

• withACL(List aclList) //設定acl

• forPath(String path) //指定路勁

[b]删除節點[/b]

– 建構操作包裝類(Builder):DeleteBuilder delete() -----CuratorFramework

– DeleteBuilder

• withVersion(int version) //特定版本号

• guaranteed() //確定節點被删除

• forPath(String path) //指定路徑

• deletingChildrenIfNeeded() //遞歸删除所有子節點

[b]關于guaranteed:[/b]

Solves edge cases where an operation may succeed on the server but connection failure

occurs before a response can be successfully returned to the client

意思是:解決當某個删除操作在伺服器端可能成功,但是此時用戶端與伺服器端的連接配接中斷,而删除的響

應沒有成功傳回到用戶端

底層的本質是重試

[b]關于異步操作[/b]

– inBackground()

– inBackground(Object context)

– inBackground(BackgroundCallback callback)

– inBackground(BackgroundCallback callback, Object context)

– inBackground(BackgroundCallback callback, Executor executor)

– inBackground(BackgroundCallback callback, Object context, Executor executor)

從參數看跟zk的原生異步api相同,多了一個線程池,用于執行回調

[b]讀取資料[/b]

– 建構操作包裝類(Builder): GetDataBuilder getData() -----CuratorFramework

– GetDataBuilder

• storingStatIn(org.apache.zookeeper.data.Stat stat) //把伺服器端擷取的狀态資料存儲到stat對象

• Byte[] forPath(String path)//節點路徑

讀取子節點

– 建構操作包裝類(Builder): GetChildrenBuilder getChildren() -----CuratorFramework

– GetChildrenBuilder

• storingStatIn(org.apache.zookeeper.data.Stat stat) //把伺服器端擷取的狀态資料存儲到stat對象

• Byte[] forPath(String path)//節點路徑

• usingWatcher(org.apache.zookeeper.Watcher watcher) //設定watcher,類似于zk本身的api,也隻能使用一次

• usingWatcher(CuratorWatcher watcher) //設定watcher ,類似于zk本身的api,也隻能使用一次

[b]設定watcher[/b]

– NodeCache

• 監聽資料節點的内容變更

• 監聽節點的建立,即如果指定的節點不存在,則節點建立後,會觸發這個監聽

– PathChildrenCache

• 監聽指定節點的子節點變化情況

• 包括:新增子節點 子節點資料變更 和子節點删除

NodeCache

– 構造函數

• NodeCache(CuratorFramework client, String path)

• NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

參數說明

client 用戶端執行個體

path 資料節點路徑

dataIsCompressed 是否進行資料壓縮

– 回調接口

• public interface NodeCacheListener

void nodeChanged() //沒有參數,怎麼擷取事件資訊以及節點資料?

PathChildrenCache

client 用戶端執行個體

path 資料節點路徑

dataIsCompressed 是否進行資料壓縮

cacheData 用于配置是否把節點内容緩存起來,如果配置為true,那麼用戶端在接

收到節點清單變更的同時,也能夠擷取到節點的資料内容,如果為false

則無法取到資料内容

threadFactory 通過這兩個參數構造專門的線程池來處理事件通知

executorService

PathChildrenCache

– 監聽接口

• 時間類型包括:新增子節點(CHILD_ADDED),子節點資料變更(CHILD_UPDATED),子節點删除(CHILD_REMOVED)

– PathChildrenCache.StartMode

• BUILD_INITIAL_CACHE //同步初始化用戶端的cache,及建立cache後,就從伺服器端拉入對應的資料

• NORMAL //異步初始化cache

• POST_INITIALIZED_EVENT //異步初始化,初始化完成觸發事件PathChildrenCacheEvent.Type.INITIALIZED

zkclient舉例

curator舉例