Kafka的实现细节
一、Topic和Partition
在Kafka中的每一条消息都有一个topic。一般来说在我们应用中产生不同类型的数据,都可以设置不同的主题。一个主题一般会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生产者写入的新消息。
kafka为每个主题维护了分布式的分区(partition)日志文件,每个partition在kafka存储层面是append log。任何发布到此partition的消息都会被追加到log文件的尾部,在分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,也就是我们的offset,offset是一个long型的数字,我们通过这个offset可以确定一条在该partition下的唯一消息。在partition下面是保证了有序性,但是在topic下面没有保证有序性。
在上图中在我们的生产者会决定发送到哪个Partition。
如果没有Key值则进行轮询发送。
如果有Key值,对Key值进行Hash,然后对分区数量取余,保证了同一个Key值的会被路由到同一个分区,如果想队列的强顺序一致性,可以让所有的消息都设置为同一个Key。
二、消费模型
消息由生产者发送到kafka集群后,会被消费者消费。一般来说我们的消费模型有两种:推送模型(psuh)和拉取模型(pull)
基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,但是这种方式无法很好地保证消费的处理语义。比如当我们把已经把消息发送给消费者之后,由于消费进程挂掉或者由于网络原因没有收到这条消息,如果我们在消费代理将其标记为已消费,这个消息就永久丢失了。如果我们利用生产者收到消息后回复这种方法,消息代理需要记录消费状态,这种不可取。如果采用push,消息消费的速率就完全由消费代理控制,一旦消费者发生阻塞,就会出现问题。
Kafka采取拉取模型(poll),由自己控制消费速度,以及消费的进度,消费者可以按照任意的偏移量进行消费。比如消费者可以消费已经消费过的消息进行重新处理,或者消费最近的消息等等。
三、网络模型
3.1 KafkaClient --单线程Selector
单线程模式适用于并发链接数小,逻辑简单,数据量小。
在kafka中,consumer和producer都是使用的上面的单线程模式。这种模式不适合kafka的服务端,在服务端中请求处理过程比较复杂,会造成线程阻塞,一旦出现后续请求就会无法处理,会造成大量请求超时,引起雪崩。而在服务器中应该充分利用多线程来处理执行逻辑。
3.2 Kafka--server -- 多线程Selector
在kafka服务端采用的是多线程的Selector模型,Acceptor运行在一个单独的线程中,对于读取操作的线程池中的线程都会在selector注册read事件,负责服务端读取请求的逻辑。成功读取后,将请求放入message queue共享队列中。然后在写线程池中,取出这个请求,对其进行逻辑处理,即使某个请求线程阻塞了,还有后续的县城从消息队列中获取请求并进行处理,在写线程中处理完逻辑处理,由于注册了OP_WIRTE事件,所以还需要对其发送响应。
四、高可靠分布式存储模型
在Kafka中保证高可靠模型的依靠的是副本机制,有了副本机制之后,就算机器宕机也不会发生数据丢失。
4.1高性能的日志存储
kafka一个topic下面的所有消息都是以partition的方式分布式的存储在多个节点上。同时在kafka的机器上,每个Partition其实都会对应一个日志目录,在目录下面会对应多个日志分段(LogSegment)。LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment索引文件和数据文件。这两个文件的命令规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下,假设有1000条消息,每个LogSegment大小为100,下面展现了900-1000的索引和Log:
由于kafka消息数据太大,如果全部建立索引,即占了空间又增加了耗时,所以kafka选择了稀疏索引的方式,这样的话索引可以直接进入内存,加快偏查询速度。
简单介绍一下如何读取数据,如果我们要读取第911条数据首先第一步,找到他是属于哪一段的,根据二分法查找到他属于的文件,找到0000900.index和00000900.log之后,然后去index中去查找 (911-900) =11这个索引或者小于11最近的索引,在这里通过二分法我们找到了索引是[10,1367]然后我们通过这条索引的物理位置1367,开始往后找,直到找到911条数据。
上面讲的是如果要找某个offset的流程,但是我们大多数时候并不需要查找某个offset,只需要按照顺序读即可,而在顺序读中,操作系统会对内存和磁盘之间添加page cahe,也就是我们平常见到的预读操作,所以我们的顺序读操作时速度很快。但是kafka有个问题,如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机I/O这个时候对性能影响很大。所以一般来说Kafka不能有太多的partition。针对这一点,RocketMQ把所有的日志都写在一个文件里面,就能变成顺序写,通过一定优化,读也能接近于顺序读。
★★★可以思考一下:1.为什么需要分区,也就是说主题只有一个分区,难道不行吗?2.日志为什么需要分段
日志策略
日志保留策略
无论消费者是否已经消费了消息,kafka都会一直保存这些消息,但并不会像数据库那样长期保存。为了避免磁盘被占满,kafka会配置响应的保留策略(retention policy),以实现周期性地删除陈旧的消息 kafka有两种“保留策略”:
- 根据消息保留的时间,当消息在kafka中保存的时间超过了指定时间,就可以被删除;
- 根据topic存储的数据大小,当topic所占的日志文件大小大于一个阀值,则可以开始删除最旧的消息
日志压缩策略
在很多场景中,消息的key与value的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心key对应的最新的value。我们可以开启日志压缩功能,kafka定期将相同key的消息进行合并,只保留最新的value值
4.2 副本机制
Kafka的副本机制是多个服务端节点对其他节点的主题分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫Reblance),kafka每个主题的每个分区都有一个主副本以及0个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。
在Kafka中并不是所有的副本都能被拿来替代主副本,所以在kafka的leader节点中维护着一个ISR(In sync Replicas)集合,翻译过来也叫正在同步中集合,在这个集合中的需要满足两个条件:
节点必须和ZK保持连接
在同步的过程中这个副本不能落后主副本太多
另外还有个AR(Assigned Replicas)用来标识副本的全集,OSR用来表示由于落后被剔除的副本集合,所以公式如下:ISR = leader + 没有落后太多的副本; AR = OSR+ ISR;
这里先要说下两个名词:HW(high watermark)是consumer能够看到的此partition的位置,LEO( log end offset)是每个partition的log最后一条Message的位置。HW能保证leader所在的broker失效,该消息仍然可以从新选举的leader中获取,不会造成消息丢失。
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:
1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。
0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
-1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(其他节点都和zk断开连接,或者都没追上),这样就变成了acks=1的情况。
副本数据同步细节(HW和LEO)
4.3 数据操作
为避免broker挂后造成数据丢失,kafka实现了高可用方式。
- 基于partition实现Replica。并与zookeeper配合实现Leader的选举。
- 通过算法,将partition的Leader与Fellowers分散于不同的broker。
replica实现
在“brokers的物理结构”中,replication有多个follewers,分散于不同的brokers。通过增量日志实现。
partition的log记录是顺序的,通过server.properties中log.retention.hours参数定义日志保留时长,过期则删除。新写入的message append记录在partition中。
为提升效率,
- follewers会在message未写入log时,读到message则将ACK发送给Leader,因此只能保证存在Replica,不能保证数据一定持久化了。
- 批量复制
ISR(副本同步队列)
ISR是In-Sync Replicate 记录与Leader保持同步的列表。
维护的是有资格的follower节点
- 副本的所有节点都必须要和zookeeper保持连接状态
- 副本的最后一条消息的offset和leader副本的最后一条消息的offset之间的差值不能超过指定的阀值,这个阀值是可以设置的(replica.lag.max.messages)
4.4 leader 选举(Leader Election )
判断Replica活着,(1)与zk有心跳通讯;(2)与Leader通讯及时。两者有一不满足,fellower都会从ISR中移除。
选举算法
一般的leader选举算法,有Majority Vote/Zab/Raft/PacificA。kafka采用的即PacificA,kafka维护多个ISR,但不不像Majorty Vote算法,限制最少的2N+1节点和N+1以上投票。
即使只有1个follewer,也可完成Leader选举。
选举过程(详解)
五、Kafka的高吐量的因素
- 顺序写的方式存储数据 ;
- 批量发送: 在异步发送模式中。kafka允许进行批量发送,也就是先讲消息缓存到内存中,然后一次请求批量发送出去。这样减少了磁盘频繁io以及网络IO造成的性能瓶颈 batch.size 每批次发送的数据大小 linger.ms 间隔时间
- 零拷贝:消息从发送到落地保存,broker维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不懂的通过socket发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤
1、操作系统将数据从磁盘读入到内核空间的页缓存 2、应用程序将数据从内核空间读入到用户空间缓存中 3、应用程序将数据写回到内核空间到socket缓存中 4、操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出
通过“零拷贝”技术可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数
// 通过多种方式操作Kafka的消息读取
https://blog.csdn.net/u011784767/article/details/78663168
六、文件存储机制
七、消息确认(确认offset)
自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
复制
手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@Override
public void doWork() {
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition=" + record.partition() + ",offset = " + record.offset() + ", key = " + record.key() + ", value = " + record.value());
this.msgList.add(record);
}
if (msgList.size() >= 5) {
System.out.println("Execute commit Message....");
// 手动提交offset
consumer.commitAsync(); // 异步提交
// consumer.commitSync(); // 同步提交
// 消费完成,提交offset (原子)
this.msgList.clear();
}
}
}
复制
八、 Kafka 消息可靠性(offset)
1、Kafka 消息的问题
Kafka就比较适合高吞吐量并且允许少量数据丢失的场景,如果非要保证“消息只读取一次”,可以使用JMS。
Kafka Producer 消息发送有两种方式(配置参数 producer.type):
producer.type=sync(默认值): 后台线程中消息发送是同步方式,对应的类为 kafka.producer.SyncProducer; producer.type=async: 后台线程中消息发送是异步方式,对应的类为 kafka.producer.AyncProducer;优点是可批量发送消息(消息个数达到 batch.num.messages=200 或时间达到 “ 时发送)、吞吐量佳,缺点是发送不及时可能导致丢失; 对于同步方式(producer.type=sync)?Kafka Producer 消息发送有三种确认方式(配置参数 acks):
acks=0: producer 不等待 Leader 确认,只管发出即可;最可能丢失消息,适用于高吞吐可丢失的业务; acks=1(默认值): producer 等待 Leader 写入本地日志后就确认;之后 Leader 向 Followers 同步时,如果 Leader 宕机会导致消息没同步而丢失,producer 却依旧认为成功; acks=all/-1: producer 等待 Leader 写入本地日志、而且 Leader 向 Followers 同步完成后才会确认;最可靠。 Kafka Consumer 有两个接口:
Low-level API: 消费者自己维护 offset 等值,可以完全控制; High-level API: 封装了对 parition 和 offset 的管理,使用简单;可能遇到 Consumer 取出消息并更新了 offset,但未处理消息即宕机,从而相当于消息丢失; Kafka 支持 3 种消息传递语义:
最多一次 -消息可能会丢失,但永远不会重新发送。consumer.poll(); consumer.commitOffset(); processMsg(messages); 至少一次 -消息永远不会丢失,但可能会重新传递。consumer.poll(); processMsg(messages); consumer.commitOffset(); 恰恰一次 - 这就是人们真正想要的,每条信息只传递一次。以事务来保证。
2 消息重复
根本原因:已经消费了数据,但是 offset 没提交。 外在原因:(1)消费数据后、提交 offset 前,线程被杀; (2)设置 offset 为自动提交,consumer.close() 之前 consumer.unsubscribe(); (3)consumer 取了一批数据,尚未处理完毕时,达到了 session.timeout.ms,导致没有接收心跳而挂掉,自动提交offset失败,下次会重复消费本批消息; 解决办法:(1)唯一 ID 保存在外部介质中,每次消费时根据它判断是否已处理; (2)如果在统计用,丢失几条关系不大,则无需理会; (3)如果消费者来不及处理,可以这样优化:增加分区以提高并行能力;增加消费者线程;关闭自动提交 enable.auto.commit=false
3 消息丢失
根本原因:已经提交了 offset,但数据在内存中尚未处理,线程就被杀掉。
消息丢失解决方案:
同步模式下,确认机制设置为-1(不可为1),即让消息写入Leader和Follower之后再确认消息发送成功; 异步模式下,设置为不限制阻塞超时时间(不可为acks=0),当缓冲区满时不清空缓冲池,而是让生产者一直处于阻塞状态;
4 消息乱序 (如何保证kafka中消息按照顺序消费)
传统的队列,在并行处理时,由于网络故障或速度差异,尽管服务器传递是有序的,但消费者接收的顺序可能不一致; Kafka 在主题内部有分区,并行处理时,每个分区仅由消费者组中的一个消费者使用,确保了消费者是该分区的唯一读者,并按顺序使用这些数据。
但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序,除非只提供一个分区。
九、Kafka的分区分配策略
partition.assignmentStrategy 指定分区策略
Range 范围分区(默认的)
假如有10个分区,3个消费者,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者为C1,C2,C3,那么用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费一个 最后分配结果如下
C1:0,1,2,3 C2:4,5,6 C3:7,8,9
如果有11个分区将会是:
C1:0,1,2,3 C2:4,5,6,7 C3:8,9,10
假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:
C1:T1(0,1,2,3) T2(0,1,2,3) C2:T1(4,5,6) T2(4,5,6) C3:T1(7,8,9) T2(7,8,9)
在这种情况下,C1多消费了两个分区
RoundRobin 轮询分区
把所有的partition和consumer列出来,然后轮询consumer和partition,尽可能的让把partition均匀的分配给consumer
假如有3个Topic T0(三个分区P0-0,P0-1,P0-2),T1(两个分区P1-0,P1-1),T2(四个分区P2-0,P2-1,P2-2,P2-3)
有三个消费者:C0(订阅了T0,T1),C1(订阅了T1,T2),C2(订阅了T0,T2)!
那么分区过程如下图所示
分区将会按照一定的顺序排列起来,消费者将会组成一个环状的结构,然后开始轮询。 P0-0分配给C0 P0-1分配给C1但是C1并没订阅T0,于是跳过C1把P0-1分配给C2, P0-2分配给C0 P1-0分配给C1, P1-1分配给C0, P2-0分配给C1, P2-1分配给C2, P2-2分配给C1, p2-3分配给C2
C0: P0-0,P0-2,P1-1 C1:P1-0,P2-0,P2-2 C2:P0-1,P2-1,P2-3
什么时候触发分区分配策略: 1.同一个Consumer Group内新增或减少Consumer
2.Topic分区发生变化
Rebalance的执行
kafka提供了一个角色Coordinator来执行。当Consumer Group的第一个Consumer启动的时候,他会向kafka集群中的任意一台broker发送GroupCoordinatorRequest请求,broker会返回一个负载最小的broker设置为coordinator,之后该group的所有成员都会和coordinator进行协调通信
整个Rebalance分为两个过程 jionGroup和sysncJion
joinGroup过程
在这一步中,所有的成员都会向coordinator发送JionGroup请求,请求内容包括group_id,member_id.protocol_metadata等,coordinator会从中选出一个consumer作为leader,并且把组成员信息和订阅消息,leader信息,rebanlance的版本信息发送给consumer
Synchronizing Group State阶段
组成员向coordinator发送SysnGroupRequet请求,但是只有leader会发送分区分配的方案(分区分配的方案其实是在消费者确定的),当coordinator收到leader发送的分区分配方案后,会通过SysnGroupResponse把方案同步到各个consumer中