摘要:KafkaProducer在發送消息的時候,需要指定發送到哪個分區, 那麼這個分區政策都有哪些呢?
本文分享自華為雲社群《Kafka生産者3中分區配置設定政策》,作者:石臻臻的雜貨鋪。
KafkaProducer在發送消息的時候,需要指定發送到哪個分區, 那麼這個分區政策都有哪些呢?我們今天來看一下
使用分區政策的配置:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsQTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5SO4kTN3YmNmNDZwQjZwIjNyYzXwIzMxATMyIzLcRDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
1. DefaultPartitioner 預設分區政策
全路徑類名:org.apache.kafka.clients.producer.internals.DefaultPartitioner
- 如果消息中指定了分區,則使用它
- 如果未指定分區但存在key,則根據序列化key使用murmur2雜湊演算法對分區數取模。
- 如果不存在分區或key,則會使用粘性分區政策,關于粘性分區請參閱 KIP-480。
粘性分區Sticky Partitioner
為什麼會有粘性分區的概念?
首先,我們指定,Producer在發送消息的時候,會将消息放到一個ProducerBatch中, 這個Batch可能包含多條消息,然後再将Batch打包發送。關于這一塊可以看看我之前的文章 圖解Kafka Producer 消息緩存模型。
這樣做的好處就是能夠提高吞吐量,減少發起請求的次數。
但是有一個問題就是, 因為消息的發送它必須要你的一個Batch滿了或者linger.ms時間到了,才會發送。如果生産的消息比較少的話,遲遲難以讓Batch塞滿,那麼就意味着更高的延遲。
在之前的消息發送中,就将消息輪詢到各個分區的, 本來消息就少,你還給所有分區周遊的配置設定,那麼每個ProducerBatch都很難滿足條件。
那麼假如我先讓一個ProducerBatch塞滿了之後,再給其他的分區配置設定是不是可以降低這個延遲呢?
詳細的可以看看下面這張圖
這張圖的前提是:
Topic1 有3分區, 此時給Topic1 發9條無key的消息, 這9條消息加起來都不超過batch.size .
那麼以前的配置設定方式和粘性分區的配置設定方式如下
可以看到,使用粘性分區之後,至少是先把一個Batch填滿了發送然後再去填充另一個Batch。不至于向之前那樣,雖然平均配置設定了,但是導緻一個Batch都沒有放滿,不能立即發送。這不就增大了延遲了嗎(隻能通過linger.ms時間到了才發送)
劃重點:
-
當一個Batch發送之後,需要選擇一個新的粘性分區的時候
①. 可用分區<1 ;那麼選擇分區的邏輯是在所有分區中随機選擇。
②. 可用分區=1; 那麼直接選擇這個分區。
③. 可用分區>1 ; 那麼在所有可用分區中随機選擇。
- 當選擇下一個粘性分區的時候,不是按照分區平均的原則來配置設定。而是随機原則(當然不能跟上一次的分區相同)
例如剛剛發送到的Batch是 1号分區,等Batch滿了,發送之後,新的消息可能會發到2或者3, 如果選擇的是2,等2的Batch滿了之後,下一次選擇的Batch仍舊可能是1,而不是說為了平均,選擇3分區。
2.UniformStickyPartitioner 純粹的粘性分區政策
全路徑類名:org.apache.kafka.clients.producer.internals.UniformStickyPartitioner
他跟DefaultPartitioner 分區政策的唯一差別就是。
DefaultPartitionerd 如果有key的話,那麼它是按照key來決定分區的,這個時候并不會使用粘性分區
UniformStickyPartitioner 是不管你有沒有key, 統一都用粘性分區來配置設定。
3. RoundRobinPartitioner 分區政策
全路徑類名:org.apache.kafka.clients.producer.internals.RoundRobinPartitioner
- 如果消息中指定了分區,則使用它
- 将消息平均的配置設定到每個分區中。
- 與key無關
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
- 當可用分區是0的話,那麼就是周遊的是所有分區中的。
- 當有可用分區的話,那麼周遊的是所有可用分區的。