ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐。Curator引入了Cache来实现对ZooKeeper服务端事件的监听。Cache是Curator中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程ZooKeeper视图的对比过程。同时Curator能够自动为开发人员处理反复注册监听,从而大大简化了原生API开发的繁琐过程。Cache分为两类监听类型:节点监听和子节点监听。
NodeCache
NodeCache用于监听指定ZooKeeper数据节点本身的变化,其构造方法有如下两个:
- public NodeCache(CuratorFramework client, String path);
- public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);
NodeCache构造方法参数说明如下表所示。
参数名 说明 client Curator客户端实例 path 数据节点的节点路径 dataIsCompressed 是否进行数据压缩
同时,NodeCache定义了事件处理的回调接口NodeCacheListener。
NodeCacheListener回调接口定义
public interface NodeCacheListener {
// Called when a change has occurred
public void nodeChanged() throws Exception;
}
当数据节点的内容发生变化的时候,就会回调该方法。下面通过一个实际例子来看看如何在代码中使用NodeCache。
NodeCache使用示例
public class NodeCache_Sample {
static String path = "/zk-book/nodecache";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:2181").sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
}client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
final NodeCache cache = new NodeCache(client, path, false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));}});
client.setData().forPath(path, "u".getBytes());
Thread.sleep(1000);
client.delete().deletingChildrenIfNeeded().forPath(path);
Thread.sleep(Integer.MAX_VALUE);
}
在上面的示例程序中,首先构造了一个NodeCache实例,然后调用start方法,该方法有个boolean类型的参数,默认是false,如果设置为true,那么NodeCache在第一次启动的时候就会立刻从ZooKeeper上读取对应节点的数据内容,并保存在Cache中。
NodeCache不仅可以用于监听数据节点的内容变更,也能监听指定节点是否存在。如果原本节点不存在,那么Cache就会在节点被创建后触发NodeCacheListener。但是,如果该数据节点被删除,那么Curator就无法触发NodeCacheListener了。
PathChildrenCache
PathChildrenCache用于监听指定ZooKeeper数据节点的子节点变化情况。
PathChildrenCache有如下几个构造方法的定义:
- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData);
- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory);
- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean datalsCompressed, ThreadFactory threadFactory);
- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean datalsCompressed, final ExecutorService executorSerivice);
- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean datalsCompressed, final CloseableExecutorService executorService);
public PathChildrenCache构造方法参数说明如下表所示。
参数名 说明 client Curator客户端实例 path 数据节点的节点路径 dataIsCompressed 是否进行数据压缩 cacheData 用于配置是否把节点内容缓存起来,如果配置为true,那么客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容;如果配置为false,则无法获取到节点的数据内容 threadFactory 利用这两个参数,开发者可以通过构造一个专门的线程池,来处理事件通知 executorService 利用这两个参数,开发者可以通过构造一个专门的线程池,来处理事件通知
PathChildrenCache定义了事件处理的回调接口PathChildrenCacheListener,其定义如下。
PathChildrenListener回调接口定义
public interface PathChildrenListener {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
}
当指定节点的子节点发生变化时,就会回调该方法。PathChildrenCacheEvent类中定义了所有的事件类型,主要包括新增子节点(CHILD_ADDED)、子节点数据变更(CHILD_UPDATED)和子节点删除(CHILD_REMOVED)三类。
下面通过一个实际例子来看如何在代码中使用PathChildrenCache。
PathChildrenCache使用示例
public class PathChildrenCache_Sample {
static String path = "/zk-book/nodecache";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:2181").sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
}client.start();
PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener(){
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception{switch(event.getType()) {}case CHILD_ADDED :case CHILD_UPDATED :System.out.println("CHILD_ADDED," + event.getData().getPath());
break;
System.out.println("CHILD_UPDATED," + event.getData().getPath());break;case CHILD_REMOVED :System.out.println("CHILD_REMOVED," + event.getData().getPath());}break;default:break;});
client.create().withMode(CreateMode.PERSISTENT).forPath(path);
Thread.sleep(1000);
client.create().withMode(Create.PERSISTENT).forPath(path + "/c1");
Thread.sleep(1000);
client.delete().forPath(path + "/c1");
Thread.sleep(1000);
client.delete().forPath(path);
Thread.sleep(Integer.MAX_VALUE);
}
运行程序,输出结果如下:
在上面这个示例程序中,对/zk-book节点进行了子节点变更事件的监听,一旦该节点新增/删除子节点,或者子节点数据发生变更,就会回调PathChildrenCacheListener,并根据对应的事件类型进行相关的处理。同时,我们也看到,对于节点zk=book本身的变更,并没有通知到客户端。
另外,和其他ZooKeeper客户端产品一样,Curator也无法对二级子节点进行事件监听。也就是说,如果使用PathChildrenCache对/zk-book进行监听,那么当/zk-book/c1/c2节点被创建或删除的时候,是无法触发子节点变更事件的。
哈哈