天天看点

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现

     如果spark的部署方式选择standalone,一个采用master/slaves的典型架构,那么master是有spof(单点故障,single point of failure)。spark可以选用zookeeper来实现ha。

     zookeeper提供了一个leader election机制,利用这个机制可以保证虽然集群存在多个master但是只有一个是active的,其他的都是standby,当active的master出现故障时,另外的一个standby master会被选举出来。由于集群的信息,包括worker, driver和application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新job的提交,对于正在进行的job没有任何的影响。加入zookeeper的集群整体架构如下图所示。

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现

master在启动时,会根据启动参数来决定不同的master故障重启策略:

zookeeper实现ha

filesystem:实现master无数据丢失重启,集群的运行时数据会保存到本地/网络文件系统上

丢弃所有原来的数据重启

master::prestart()可以看出这三种不同逻辑的实现。

recovery_mode是一个字符串,可以从spark-env.sh中去设置。

如果不设置spark.deploy.recoverymode的话,那么集群的所有运行数据在master重启是都会丢失,这个结论是从blackholepersistenceengine的实现得出的。

它把所有的接口实现为空。persistenceengine是一个trait。作为对比,可以看一下zookeeper的实现。

spark使用的并不是zookeeper的api,而是使用的org.apache.curator.framework.curatorframework 和 org.apache.curator.framework.recipes.leader.{leaderlatchlistener, leaderlatch} 。curator在zookeeper上做了一层很友好的封装。

简单总结一下参数的设置,通过上述代码的分析,我们知道为了使用zookeeper至少应该设置一下参数(实际上,仅仅需要设置这些参数。通过设置spark-env.sh:

各个参数的意义:

参数

默认值

含义

spark.deploy.recoverymode

none

恢复模式(master重新启动的模式),有三种:1, zookeeper, 2, filesystem, 3 none

spark.deploy.zookeeper.url

zookeeper的server地址

spark.deploy.zookeeper.dir

/spark

zookeeper 保存集群元数据信息的文件目录,包括worker,driver和application。

curatorframework极大的简化了zookeeper的使用,它提供了high-level的api,并且基于zookeeper添加了很多特性,包括

自动连接管理:连接到zookeeper的client有可能会连接中断,curator处理了这种情况,对于client来说自动重连是透明的。

简洁的api:简化了原生态的zookeeper的方法,事件等;提供了一个简单易用的接口。

recipe的实现(更多介绍请点击recipes):

leader的选择

共享锁

缓存和监控

分布式的队列

分布式的优先队列

curatorframeworks通过curatorframeworkfactory来创建线程安全的zookeeper的实例。

curatorframeworkfactory.newclient()提供了一个简单的方式来创建zookeeper的实例,可以传入不同的参数来对实例进行完全的控制。获取实例后,必须通过start()来启动这个实例,在结束时,需要调用close()。

需要关注的还有两个recipe:org.apache.curator.framework.recipes.leader.{leaderlatchlistener, leaderlatch}。

首先看一下leaderlatchlistener,它在leaderlatch状态变化的时候被通知:

在该节点被选为leader的时候,接口isleader()会被调用

在节点被剥夺leader的时候,接口notleader()会被调用

由于通知是异步的,因此有可能在接口被调用的时候,这个状态是准确的,需要确认一下leaderlatch的hasleadership()是否的确是true/false。这一点在接下来spark的实现中可以得到体现。

leaderlatch负责在众多连接到zookeeper cluster的竞争者中选择一个leader。leader的选择机制可以看zookeeper的具体实现,leaderlatch这是完成了很好的封装。我们只需要要知道在初始化它的实例后,需要通过

通过addlistener可以将我们实现的listener添加到leaderlatch。在listener里,我们在两个接口里实现了被选为leader或者被剥夺leader角色时的逻辑即可。

实际上因为有curator的存在,spark实现master的ha就变得非常简单了,zookeeperleaderelectionagent实现了接口leaderlatchlistener,在isleader()确认所属的master被选为leader后,向master发送消息electedleader,master会将自己的状态改为alive。当noleader()被调用时,它会向master发送消息revokedleadership时,master会关闭。

在prestart中,启动了leaderlatch来处理选举zk中的leader。就如在上节分析的,主要的逻辑在isleader和noleader中。

updateleadershipstatus的逻辑很简单,就是向master发送消息。

为了解决standalone模式下的master的spof,spark采用了zookeeper提供的选举功能。spark并没有采用zookeeper原生的java api,而是采用了curator,一个对zookeeper进行了封装的框架。采用了curator后,spark不用管理与zookeeper的连接,这些对于spark来说都是透明的。spark仅仅使用了100行代码,就实现了master的ha。当然了,spark是站在的巨人的肩膀上。谁又会去重复发明轮子呢?

请您支持:

如果你看到这里,相信这篇文章对您有所帮助。如果是的话,请为本文投一下票吧: 点击投票,多谢。如果您已经在投票页面,请点击下面的投一票吧!

btw,即使您没有csdn的帐号,可以使用第三方登录的,包括微博,qq,gmail,github,百度,等。

继续阅读