更新一段我在linkedin上对这个项目的描述,目前项目已经开发完在使用了。本文并不是最新的设计。
背景
解决hdfs/hive/rdbms/ftp/mongodb等数据源之间的批量数据同步问题
特性
跨机房场景下的链路优化;多路输入和输出的任务模型;数据容错和可持久化;任务失败恢复
任务调度
把任务配置解析为物理执行计划,master控制任务的调度和失败恢复,基于mesos完成资源分配和任务调度。slave分布在各个数据中心,具体传输任务的调起做到链路优化选择。高并发场景下,增加mesos slave节点来保证可扩展性(cpu和mem资源),master将元数据记录在zk上,并通过争抢zk锁实现互备。
数据传输
传输组件分为input、cache和output三种executor,各自进程内通过双队列优化传输速度。数据以bundle为单位传输,通常上百行为一个bundle且可压缩,netty作网络通信。input端异步备份一份数据在bookkeeper,cache使用beanstalkd做消息队列,output端处理bundle成功或失败,会有守护线程异步删除或更新beanstalkd内的message(类似storm topology里的ack),executors会把bundle传输状态更新在zookkeeper上,某一executor挂掉都可以在一台slave上重新调起并恢复任务继续进行。input和output端的reader和writer是插件化的。
==================================== 我是更新线 ====================================
之前在最近分布式系统开发小结里,提到了一个在开发中的系统的大致设计,本文是我负责部分的一个详细设计。在阅读本文前可以先浏览下之前那篇文章,对于系统的功能和概况有个基本了解。
slave模块主要需要实现不同的mesos executors,包括input, memorystorage和output三种executor。每个dpump任务会由scheduler manager经过逻辑执行计划和物理执行计划的拆分,从knowledge center获取知识,最终将切分后的task分配给相应的slaves执行,并通过mesos master,分配资源并调起slave上的各自的executor。三种executors的执行逻辑图如下。
数据通过bundle形式在三种executor之间的流通,每个bundle有唯一id、一个string[]、以及一个index。index用于标记每个bundle最后数据输出的最新成功行,即我们容错粒度控制在行级别。对input、cache、output作一个简单介绍:
input,也叫reader。每个task内只有一个input executor,负责从数据源(hdfs、ftp、mysql、mongodb等)读出数据,将数据经过切分、处理、压缩后通过netty流式传输给memorystorage。
cache,也叫memorystorage。每个task内只有一个cache executor,负责从input端接收bundle,将bundle存取往一个队列内,当有output连接的时候,将bundle取出输送给output
output,也叫writer。每个task可能有多个output executor,负责将数据最终输出到数据目的源。output从cache端得到bundle的过程也是流式的。
整个task的流通都是流式的,且slave之间的网络通信使用的是netty这个nio框架,传输过程中还涉及到bundle高效的正反序列化和压缩、解压缩。最重要的一点是input、cache、output三个部分各自都有容错设计,其中input和output通过向zookeeper记录和获取bundle状态保证处理bundle的不重不漏,而cache通过对队列内消息内容的钝化,保证自身已保存的bundle不丢失,并能在新的cache executor起来后,可以继续为output提供bundle输出。
下面详细介绍三种executor的设计,阅读过程中请参考这张task进程图。
每个input负责一次job(每个job对应多个tasks)内最小粒度的文件块读取,比如可能是一个hdfs block,一张hive表的一个分区甚至是一张mysql表。
input内还分有writer、buffer(双队列)和reader。writer是一个单线程,从数据源获取数据并切分好bundle,每个bundle有唯一id和定长的字符串数组,然后将bundle存入双队列的输入头,在双队列的读出头有若干个reader线程抢占bundle,每个reader获取到bundle后释放锁并做二次处理、压缩,最终reader通过netty client将bundle包装成一个传输格式,以二进制流的方式通过channel流向cache。
writer端切分bundle保证了从同一个数据源的同份文件块读取数据生成bundle是有序的,每次netty往channel里写入一份bundle的时候,会通过companion线程异步更新此task下znode内的bitmap,该bitmap标记每个bundle在input端是否被传输。每次input启动的时候,netty会读取znode上的bitmap缓存在内存里,发送bundle前根据id作一次校对。所以当input挂掉或重启时,可以保证发送给cache的bundle不重不漏。
cache本身是一个netty server,接收input和output多个netty client的连接,并对不同的channel做不同event处理。cache executor需要一个多状态的消息队列,这里采用的是beanstalkd队列,下图为该beanstalkd内消息(job)的状态变化图。
每次cache将新的bundle put进beanstalkd的时候需要选择一个tube(管道),beanstalkd可以开启多个独立的tube,tube内存放jobs,每个job有自己唯一的job id,而job消息体就是我们的bundlebytes(bundle存入cache直接存的就是序列化后的byte[])。
每个job存入queue后是ready状态,被reserve之后,就不能被客户端再次获取到,即cache每次会从每个tube里按顺序reserve一个job,并发送消息体给output(一个output对应一个tube),这个过程保证每个job被消费一次,且只能被一个output消费。如果output端消费成功,则该job会被delete掉;如果该job消费失败,则会被重新置为ready,重新置为ready可能是因为超时(每个job被reserve的时候都有一个time-to-run时间设置)了,也可以是客户端release掉该job。
这里,对于tube内job的后续处理交给acker这个线程来做。acker的设计灵感来源于storm。storm topology内每个bolt对tuple的执行和处理最终都会给spout一个ack响应,而拓扑过程中整棵tuple树的成功/失败执行状态会由acker守护进程进行跟踪,以此来保证每个tuple被完全处理,而acker对tuple的跟踪算法是storm的主要突破之一。
cache端的acker线程会监听zookeeper上znode树上各个节点的事件变化,从而掌握被output消费的所有bundle的最后状态,对应地删除、释放,或者更新queue里的job。需要注意的是这里还涉及到一个更新job的过程。前面提到bundle内维护了一个index,而output消费bundle的时候,如果是数据行写了一半出现了异常或者挂掉了,我们需要记录bundle内数据行的最新index并将此信息也记录在znode上。对于这种最坏情况,acker负责将该fail的job从queue里delete掉,并更改job内bundle bytes内容,重置新的index,再把新的job put进queue里。这是我们最不希望看到的情况,同时也是我们对bundle能做的最细粒度的容错设计。
beanstalkd启动之后可以打开binlog开关,binlog是beanstalkd容错恢复的机制,将内存里的消息队列结构映射到硬盘上。对于cache的容错设计,直观的办法在于将这份binlog存在nfs或hdfs上,来保证cache挂掉重启后,能获取到之前保存的bundle数据,继续提供服务。
output在最终的bundle消费阶段,会把数据导向新的数据源。每个output获取的bundle来自于cache里的一个tube,而每个bundle的执行情况也会由companion线程异步更新到zookeeper上。
对于output来说,它只需要关心从cache端获取的每个bundle都照常处理就可以了,不需要关心这个bundle之前是否被消费过,被消费到哪里。原因在于,cache端的job状态的变更和job的更新可以由acker保障,而acker是从zk上得到这些job的状态并对queue异步更新。如果acker挂了,只要重新起一个线程获取znode上最新的状态就可以了。对于output来说,能传过来的bundle,对应到queue里就是ready状态的job,这个job可能被消费过了,但是他的index也因此得到了更新,output端对于所有bundle的处理是一致的,唯一需要关心的是output需要把bundle的信息异步更新给zk,如果output挂了,重新起一个output接着从cache读bundle就可以了。
slave模块三种executor的设计,主要考虑的是各个executor挂掉之后,怎样保证数据处理的不重复和不遗漏。我们依赖zookeeper的可靠性,记录、更新、判断bundle的状态,做到input、cache、output各司其职,最到最小粒度的容错。executor本身的失败和重启则由mesos保障,mesos作为资源管理系统,由master监控slave上各个executor的执行状况,通过回调,可以在合适的slave上再次启动挂掉的executor进程,保证业务task的顺利进行。
(全文完)