天天看点

kafka超时导致的重复消费的问题

今天看到群友遇到个问题:

问题的表象是超时导致autoCommit失败,从而导致重复消费

错误内容是:

-- :: [kudu--C-] WARN o.a.k.c.consumer.internals.ConsumerCoordinator - Auto offset commit failed for group sm: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
           

记得以前看过session.timeout设置过短,导致commit失败,原因是在于kafka是通过heartbeat 判断session是否超时,而在客户端实现heart是同步实现的,所以当一次poll—>process—>commit时间超长的话,会导致这种情况

查了下kafka的wiki确实有这种情况,认为是个bug

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=61333789

Adding a KafkaConsumer#heartbeat()/ping() method

One alternative that has been discussed is adding a heartbeat() API which sends a heartbeat and returns a flag (or throws an exception) to indicate that a rebalance is needed. This might change the typical poll() loop to something like the following:

while (running) {
  ConsumerRecords<K, V> records = consumer.poll();
  for (ConsumerRecord<K, V> record : records){
    process(record);
    if (!consumer.heartbeat())
      break;
  }
  consumer.commitSync();
}
           
The problem with this approach is making it work in a reasonable way with offset commits. In the above example, breaking from the loop and committing before all messages have been processed will cause message loss. We can fix this by maintaining the offset map to commit explicitly, but it’s unclear how to make it work with an auto-commit policy. In general, we felt that this added unneeded complexity to the API and that the same effect could be achieved in a safer way by setting max.poll.records to 1 and using the original poll() loop structure.

在看kafka的升级文档:

Notable changes in 0.10.1.0

The new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than max.poll.interval.ms because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value of session.timeout.ms has been adjusted down to 10 seconds, and the default value of max.poll.records has been changed to 500.

按这里说的其实这里已经把heartbeat放到后台线程处理了,然后看了下群友的使用的也是新版kafak的客户端

看了下stackoverflow的一篇解释豁然开朗

https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10-0

Before KIP-62, there is only session.timeout.ms (ie, Kafka 0.10.0 and earlier). max.poll.interval.ms is introduced via KIP-62 (part of Kafka 0.10.1).

KIP-62, decouples heartbeats from calls to poll() via a background heartbeat thread. This, allow for a longer processing time (ie, time between two consecutive poll()) than heartbeat interval.

Assume processing a message takes 1 minute. If heartbeat and poll are coupled (ie, before KIP-62), you will need to set session.timeout.ms larger than 1 minute to prevent consumer to time out. However, if consumer dies, it also takes longer than 1 minute to detect the failed consumer.

KIP-62 decouples polling and heartbeat allowing to sent heartbeat between two consecutive polls. Now you have two threads running, the heartbeat thread and the processing thread and thus, KIP-62 introduced a timeout for each. session.timeout.ms is for the heartbeat thread while max.poll.interval.ms is for the processing thread.

Assume, you set session.timeout.ms=30000, thus, the consumer heartbeat thread must sent a heartbeat to the broker before this time expires. On the other hand, if processing of a single message takes 1 minutes, you can set max.poll.interval.ms larger than one minute to give the processing thread more time to process a message.

If the processing thread dies, it takes max.poll.interval.ms to detect this. However, if the whole consumer dies (and a dying processing thread most likely crashes the whole consumer including the heartbeat thread), it takes only session.timeout.ms to detect it.

The idea is, to allow for a quick detection of a failing consumer even if processing itself takes quite long.

这里其实解释了为什么要增加max.poll.interval.ms这个属性,从设计上看,如果按原来的设计,我们增大session.timeout会导致什么?会导致服务端感知客户端掉线要花很长时间。所以增加了max.poll.interval.这样session.timeout就可以设置成正常值。我们可以通过max.poll.intervale.ms这个值来控制单次poll–>process的时间。当然减少max.poll.records也是可以的。就如同错误提示的建议一样。