天天看点

Storm-源码分析- Storm中Zookeeper的使用

在backtype.storm.cluster.clj中, 定义了storm对于zookeeper的使用

首先定义操作zookeeper集群的interface

实现和生成用于操作zookeeper集群的record 

首先创建zk-client, 并在zk上创建storm-zookeeper-root目录 

接着定义, 

    callbacks, callback集合 

    active, 标志zk集群状态 

    zk, zk client

创建zk client的时候, 设置了watcher, 即zk server当状态发生变化时会给client发送event, 此处client设置的watcher会调用callbacks来处理server发送的event

最后实现clusterstate protocol, 其中register和unregister是用来添加/删除callbacks的, 其他都是些zk的常规操作

定义针对storm定制的zk操作协议, 包含各种storm里面的信息在zk上的读写

首先判断是否第一次mk-storm-cluster-state, 既是否进行过zk cluster state的创建, 如果没有调用mk-distributed-cluster-state 

接着, 定义一系列的callbacks, 并调用cluster-state的register, 注册到callbacks列表中 

       state-id 就是register返回的callback的uuid 

再者, 在zk上创建storm的子目录 

最后, 实现stormclusterstate协议, 实现各种zk数据的读写

通过一个场景来说明storm怎样使用zookeeper

supervisor中的mk-synchronize-supervisor, 主要用于下载新的, 并删除不使用的topology代码 

所以这个逻辑光执行一次是不够的, 需要当每次assignment发生变化的时候就执行一次

storm是利用zookeeper的watcher来解决这个问题 

1. 在mk-distributed-cluster-state中创建zk client的时候配置watcher, 当收到zk server的event的时候, 调用callbacks列表里面的callback进行处理

2. 在mk-storm-cluster-state 中将callback加入cluster-state的callback列表 

    而这个callback本身, 就是根据event中的path(代表哪部分数据发生change)来issue在storm-cluster-state中维护的一系列callback 

    比如, 当assignments-root发生变化时, 会调用assignments-callback  

3. 那么也就是说只需要将mk-synchronize-supervisor, set到assignments-callback, 就可以保证当assignments-root发生变化时, 调用mk-synchronize-supervisor去同步topology代码 

   什么时候set? 在第一次调用mk-synchronize-supervisor的时候 

   同步topology代码是消耗时间的事情, 所以实现的时候放在后台执行, 只是将this(function) add到event-manager的queue里面, 后台线程会执行这个函数 

   并且在调用assignment获取assignments-snapshot的时候, 将sync-callback set到assignments-callback中去

by the way, 对于get-children, 是否有callback, 即是否被watch, 读的数据是不一样的, 具体原因不是很清楚, 需要后面看看zk的具体使用

4. 前面说了issue-callback!在执行assignments-callback之前, 会将其清空, 所以如果需要不断的触发, 那么就要不断的设置assignments-callback 

    所以作为callback, mk-synchronize-supervisor会先通过assignments-snapshot去重设assignments-callback 

    至于为什么要采用这样的机制? 现在还看不清楚

本文章摘自博客园,原文发布日期:2013-06-26 

继续阅读