天天看點

Kafka生成消息時的3種分區政策

摘要:KafkaProducer在發送消息的時候,需要指定發送到哪個分區, 那麼這個分區政策都有哪些呢?

本文分享自華為雲社群《​​Kafka生産者3中分區配置設定政策​​》,作者:石臻臻的雜貨鋪。

KafkaProducer在發送消息的時候,需要指定發送到哪個分區, 那麼這個分區政策都有哪些呢?我們今天來看一下

使用分區政策的配置:

Kafka生成消息時的3種分區政策

1. DefaultPartitioner 預設分區政策

全路徑類名:org.apache.kafka.clients.producer.internals.DefaultPartitioner

  • 如果消息中指定了分區,則使用它
  • 如果未指定分區但存在key,則根據序列化key使用murmur2雜湊演算法對分區數取模。
  • 如果不存在分區或key,則會使用粘性分區政策,關于粘性分區請參閱 KIP-480。

粘性分區Sticky Partitioner

為什麼會有粘性分區的概念?

首先,我們指定,Producer在發送消息的時候,會将消息放到一個ProducerBatch中, 這個Batch可能包含多條消息,然後再将Batch打包發送。關于這一塊可以看看我之前的文章 圖解Kafka Producer 消息緩存模型。

Kafka生成消息時的3種分區政策

這樣做的好處就是能夠提高吞吐量,減少發起請求的次數。

但是有一個問題就是, 因為消息的發送它必須要你的一個Batch滿了或者linger.ms時間到了,才會發送。如果生産的消息比較少的話,遲遲難以讓Batch塞滿,那麼就意味着更高的延遲。

在之前的消息發送中,就将消息輪詢到各個分區的, 本來消息就少,你還給所有分區周遊的配置設定,那麼每個ProducerBatch都很難滿足條件。

那麼假如我先讓一個ProducerBatch塞滿了之後,再給其他的分區配置設定是不是可以降低這個延遲呢?

詳細的可以看看下面這張圖

這張圖的前提是:

Topic1 有3分區, 此時給Topic1 發9條無key的消息, 這9條消息加起來都不超過batch.size .

那麼以前的配置設定方式和粘性分區的配置設定方式如下

Kafka生成消息時的3種分區政策

可以看到,使用粘性分區之後,至少是先把一個Batch填滿了發送然後再去填充另一個Batch。不至于向之前那樣,雖然平均配置設定了,但是導緻一個Batch都沒有放滿,不能立即發送。這不就增大了延遲了嗎(隻能通過linger.ms時間到了才發送)

劃重點:

  1. 當一個Batch發送之後,需要選擇一個新的粘性分區的時候

    ①. 可用分區<1 ;那麼選擇分區的邏輯是在所有分區中随機選擇。

    ②. 可用分區=1; 那麼直接選擇這個分區。

    ③. 可用分區>1 ; 那麼在所有可用分區中随機選擇。

  2. 當選擇下一個粘性分區的時候,不是按照分區平均的原則來配置設定。而是随機原則(當然不能跟上一次的分區相同)

例如剛剛發送到的Batch是 1号分區,等Batch滿了,發送之後,新的消息可能會發到2或者3, 如果選擇的是2,等2的Batch滿了之後,下一次選擇的Batch仍舊可能是1,而不是說為了平均,選擇3分區。

2.UniformStickyPartitioner 純粹的粘性分區政策

全路徑類名:org.apache.kafka.clients.producer.internals.UniformStickyPartitioner

他跟DefaultPartitioner 分區政策的唯一差別就是。

DefaultPartitionerd 如果有key的話,那麼它是按照key來決定分區的,這個時候并不會使用粘性分區

UniformStickyPartitioner 是不管你有沒有key, 統一都用粘性分區來配置設定。

3. RoundRobinPartitioner 分區政策

全路徑類名:org.apache.kafka.clients.producer.internals.RoundRobinPartitioner

  • 如果消息中指定了分區,則使用它
  • 将消息平均的配置設定到每個分區中。
  • 與key無關
@Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }      
  1. 當可用分區是0的話,那麼就是周遊的是所有分區中的。
  2. 當有可用分區的話,那麼周遊的是所有可用分區的。