注意:如想要了解更多大数据和算法相关资料,关注后有惊喜哦
一、服务端启动整体流程图
二、整体代码解读
先给大家看一下服务端整体的解读流程,方便后期大家阅读,以及更加深入了解zookeeper的运行机制
上图是阅读服务端运行源码时的笔记,流程可能不是很清晰;总体思路是先了解全貌,然后再细化具体的逻辑
Zookeeper启动类是QuorumPeerMain ,其参数是配置文件zoo.cfg
流程1.参数解析
QuorumPeerConfig config = new QuorumPeerConfig();
流程2.创建快照日志截断定时调度器
这里启动DatadirCleanupManager线程,zk的任何一个变更操作都会记录到transaction log中,由于内存数据易丢失,所以必须要刷写到磁盘上。当写操作达到一定量或者一定时间间隔后,会进行一次刷写,其主要目的是为了缩短启动时加载数据的时间从而加快系统的启动,另一方面是为了避免transaction log日志数量过大,所以要定期清理
/*** 2.创建快照日志截断定时调度器 * snapRetainCount:清理后保留的snapshot的个数,对应配置:autopurge.snapRetainCount,大于等于3,默认3* purgeInterval:清理任务TimeTask执行周期,即几个小时清理一次,对应配置:autopurge.purgeInterval,单位:小时*/DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start();
流程3.判断启动模式,启动单机版服务或者分布式集群模式服务
/*** 3.判断启动模式*/if (args.length == 1 && config.isDistributed()) { /**集群模式*/ runFromConfig(config);} else { /**单机模式*/ ZooKeeperServerMain.main(args);}
接下来将通过集群模式来解读服务端是如何进行运转的。那么也就是从runFromConfig方法作为入口开始解析
三、集群模式-服务端启动流程
流程1.注册JMX log4j 控制器
ManagedUtil.registerLog4jMBeans();
流程2.实例化两种ServerCnxnFactory(带有SSL和无SSL)
ServerCnxnFactory从名字就可以看出其是一个工厂类,负责管理ServerCnxn,ServerCnxn这个类代表了一个客户端与一个server的连接,每个客户端连接过来都会被封装成一个ServerCnxn实例用来维护了服务器与客户端之间的Socket通道。首先要有监听端口,客户端连接才能过来,ServerCnxnFactory.configure()方法的核心就是启动监听端口供客户端连接进来,端口号由配置文件中clientPort属性进行配置,默认是2181
/*** 2.未开启SSL的ServerCnxnFactory*/if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);}/*** 2.开启SSL的ServerCnxnFactory*/if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true);}
流程3.创建QuorumPeer并初始化
Quorum在Zookeeper中代表集群中大多数节点的意思,即一半以上节点,Peer是端、节点的意思,Zookeeper集群中一半以上的节点其实就可以代表整个集群的状态,QuorumPeer就是管理维护的整个集群的一个核心类,这一步主要是创建一个QuorumPeer实例,并进行各种初始化工作
quorumPeer = getQuorumPeer(); //创建实例quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir()));quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());quorumPeer.enableLocalSessionsUpgrading( config.isLocalSessionsUpgradingEnabled());//quorumPeer.setQuorumPeers(config.getAllMembers());quorumPeer.setElectionType(config.getElectionAlg());;//选举类型,用于确定选举算法quorumPeer.setMyid(config.getServerId());quorumPeer.setTickTime(config.getTickTime());quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit());quorumPeer.setConfigFileName(config.getConfigFilename());quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);if (config.getLastSeenQuorumVerifier() != null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);}quorumPeer.initConfigInZKDatabase();quorumPeer.setCnxnFactory(cnxnFactory); //ServerCnxnFactory客户端请求管理工厂类quorumPeer.setSecureCnxnFactory(secureCnxnFactory);quorumPeer.setSslQuorum(config.isSslQuorum());quorumPeer.setUsePortUnification(config.shouldUsePortUnification());quorumPeer.setLearnerType(config.getPeerType());quorumPeer.setSyncEnabled(config.getSyncEnabled());quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());if (config.sslQuorumReloadCertFiles) { quorumPeer.getX509Util().enableCertFileReloading();}// sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);if (quorumPeer.isQuorumSaslAuthEnabled()) { quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);}quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);quorumPeer.initialize();
流程4.启动(QuorumPeer中的start方法),非常重要,后面将会有单独的篇幅对每个流程进行细节解读
那么到了执行到这里,也是最重要的一步流程;该步也是进行数据加载构建DataTree模型、选择选举算法、选举初始化、数据同步、会话管理等重要功能的入口;那么本篇将会介绍该入口下的整体流程,后面会有单独的篇幅来介绍zookeeper如何进行leader选举以及选举后如何进行数据同步的
- 4.1 加载快照、事务文件到内存生成DataTree模型
涉及到的核心类是ZKDatabase,并借助于FileTxnSnapLog工具类将snap和transaction log反序列化到内存中,最终构建出内存数据结构DataTree ,其核心就是从文件流中读取数据,转换成DataTree对象,放入zkDb中;首先先了解ZKDatabase、DataTree、DataNode三者之间的关系,见下图:
通过QuorumPeer.loadDataBase方法入口开始恢复加载数据到内存,然后调用zkDb.loadDatabase方法,底层调用snapLog.restore方法,然后最底层调用FileTxnSnapLog.restore方法从磁盘中读取文件
-
4.2 Socket服务启动
之前介绍过ServerCnxnFactory作用,ServerCnxnFactory本身也可以作为一个线程,其run方法实现的大致逻辑是:构建reactor模型的EventLoop,Selector每隔1秒执行一次select方法来处理IO请求,并分发到对应的代表该客户端的ServerCnxn中并利用doIO进行处理
/**启动ServerCnxn,建立Socket通道*/startServerCnxnFactory();
-
4.3 Leader选举初始化
初始化一些Leader选举工作
1.创建一个QuorumCnxManager实例,负责集群中各节点的网络IO
2.QuorumCnxManager实例内部有一个listener监听器,会启动一个线程,主要是用来监听选举端口并处理连接进来的socket
3.选择使用FastLeaderElection算法(一共有4个选举算法,其他三个都已经过时废弃了)
该步具体流程后面会有单独篇幅来介绍,这里先了解下整体流程
4.4 启动QuorumPeer,单独开启一个线程用于Leader选举(发现、广播、同步)
这里调用的是supper.start()方法,由于QuorumPeer继承自ZooKeeperThread,所以自身也是一个线程,进入run()方法后可以发现这里进入到一个无限循环的模式,不停的通过getPeerState方法获取当前的状态,然后执行相应分支的逻辑:
1. 系统刚启动的时候serverState默认是LOOKING状态,需要进行选举,这时会调用 FastLeaderElection.lookForLeader方法 ,该方法内部有一个循环的逻辑,直到选举出leader后才会跳出,如果选举出的Leader节点就是自身,那么就会将serverState变更为LEADING,否则设置成FOLLOWING或OBSERVING
2.然后进入下一轮的循环,根据自身的状态角色,执行对应的逻辑
注意: 进入分支线程会一直阻塞在其分支中,直到角色转变才会重新进行下一轮次循环,比如Follower监控到无法与Leader保持通信了,会将serverState赋值为LOOKING,跳出分支并进行下一轮次循环,这时就会进入LOOKING分支中重新进行Leader选举
/** 进入无限循环,不停的通过getPeerState方法获取当前节点状态* 注意:进入分支路程会一直阻塞在其分支中,直到角色转变才会重新进行下一轮次循环*/while (running) { switch (getPeerState()) { case LOOKING: if (Boolean.getBoolean("readonlymode.enabled")) { // Create read-only server but don't start it immediately final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); Thread roZkMgr = new Thread() { public void run() { try { sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } finally { roZkMgr.interrupt(); roZk.shutdown(); } } else { try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; /** * 选举leader */ startLeaderElection(); } //调用FastLeaderElection.lookForLeader() setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; case OBSERVING: try { /**创建Observer实例并调用observerLeader*/ setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { observer.shutdown(); setObserver(null); updateServerState(); } break; case FOLLOWING: try { /**创建Follower实例并调用followLeader*/ setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; case LEADING: /**创建Leader实例并调用lead*/ try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } start_fle = Time.currentElapsedTime();}
- 4.5 阻塞监听、直到服务停止
●历史文章汇总
●一段SQL简述鬼谷子问徒问题
●最长回文字符串三种解法
●十分钟搞定分布式一致性算法
●一文教你如何玩转zookeeper
●zookeeper应用场景解决方案-Leader选
●实战:如何实时采集上亿级别数据?
●Kafka深度剖析HW以及LEO
●Livy REST 提交Spark作业
●Spark集成ElasticSearch
●Spark数据倾斜之骚操作解决方案
●一道简单的算法面试题
●Impala介绍以及常见问题
●ElasticSearch无感知重构索引
●ElasticSearch分页搜索以及deep paging性能揭秘
●ElasticSearch 一个field索引两次解决排序问题
●ElasticSearch Partial Update大揭秘
●Hive常见问题汇总