天天看點

kafka topic數量上限_徹底搞懂 Kafka 消息大小相關參數設定的規則

前段時間接到使用者要求,調整某個主題在 Kafka 叢集消息大小為 4M。

根據 Kafka 消息大小規則設定,生産端自行将 max.request.size 調整為 4M 大小,Kafka 叢集為該主題設定主題級别參數 max.message.bytes 的大小為 4M。

以上是針對 Kafka 2.2.x 版本的設定,需要注意的是,在某些舊版本當中,還需要調整相關關聯參數,比如 replica.fetch.max.bytes 等。

從上面例子可看出,Kafka 消息大小的設定還是挺複雜的一件事,而且還分版本,需要注意的參數巨多,而且每個都長得差不多,不但分版本,還需要注意生産端、broker、消費端的設定,而且還要區分 broker 級别還是 topic 級别的設定,而且還需要清楚知道每個配置的含義。

本文通過相關參數的解析說明,再結合實戰測試,幫助你快速搞明白這些參數的含義以及規則。

broker

broker 關于消息體大小相關的參數主要有 message.max.bytes、replica.fetch.min.bytes、replica.fetch.max.bytes、replica.fetch.response.max.bytes

1、message.max.bytes

Kafka 允許的最大 record batch size,什麼是 record batch size ?簡單來說就是 Kafka 的消息集合批次,一個批次當中會包含多條消息,生産者中有個參數 batch.size,指的是生産者可以進行消息批次發送,提高吞吐量,以下是 message.max.bytes 參數作用的源碼:

kafka.log.Log#analyzeAndValidateRecords

kafka topic數量上限_徹底搞懂 Kafka 消息大小相關參數設定的規則

以上源碼可以看出 message.max.bytes 并不是限制消息體大小的,而是限制一個批次的消息大小,是以我們需要注意生産端對于 batch.size 的參數設定需要小于 message.max.bytes。

以下附帶 Kafka 官方解釋:

The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large.

In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.

This can be set per topic with the topic level

max.message.bytes

config.

翻譯如下:

Kafka 允許的最大記錄批量。如果增加此數量,并且有一些消費者的年齡大于 0.10.2,則消費者的擷取大小也必須增加,以便他們可以擷取如此大的記錄批次。

在最新的消息格式版本中,為了提高效率,始終将記錄分組。在以前的消息格式版本中,未壓縮的記錄不會分組,并且在這種情況下,此限制僅适用于單個記錄。

可以使用主題級别 “max.message.bytes” 配置針對每個主題進行設定。

2、replica.fetch.min.bytes、replica.fetch.max.bytes、replica.fetch.response.max.bytes

kafka 的分區如果是多副本,那麼 follower 副本就會源源不斷地從 leader 副本拉取消息進行複制,這裡也會有相關參數對消息大小進行設定,其中 replica.fetch.max.bytes 是限制拉取分區中消息的大小,在 0.8.2 以前的版本中,如果 replica.fetch.max.bytes < message.max.bytes,就會造成 follower 副本複制不了消息。不過在後面的版本當中,已經對這個問題進行了修複。

replica.fetch.max.bytes 參見 2.2.x 版本的官方解釋:

The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config).

翻譯如下:

嘗試為每個分區擷取的消息的位元組數。這不是絕對最大值,如果擷取的第一個非空分區中的第一個記錄批處理大于此值,那麼仍将傳回記錄批處理以確定進度。 代理接受的最大記錄批處理大小是通過 message.max.bytes(代理配置)或 max.message.bytes(主題配置)定義的。

replica.fetch.min.bytes、replica.fetch.response.max.bytes 同理。

topic

1、max.message.bytes

該參數跟 message.max.bytes 參數的作用是一樣的,隻不過 max.message.bytes 是作用于某個 topic,而 message.max.bytes 是作用于全局。

producer

1、max.request.size

該參數挺有意思的,看了 Kafka 生産端發送相關源碼後,發現消息在 append 到 RecordAccumulator 之前,會校驗該消息是否大于 max.request.size,具體邏輯如下:

org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize

kafka topic數量上限_徹底搞懂 Kafka 消息大小相關參數設定的規則

從以上源碼得出結論,Kafka 會首先判斷本次消息大小是否大于 maxRequestSize,如果本次消息大小 maxRequestSize,則直接抛出異常,不會繼續執行追加消息到 batch。

并且還會在 Sender 線程發送資料到 broker 之前,會使用 max.request.size 限制發送請求資料的大小:

org.apache.kafka.clients.producer.internals.Sender#sendProducerData

kafka topic數量上限_徹底搞懂 Kafka 消息大小相關參數設定的規則

也就是說,max.request.size 參數具備兩個特性:

1)限制單條消息大小

2)限制發送請求大小

參見 2.2.x 版本的官方解釋:

The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size which may be different from this.

翻譯如下:

請求的最大大小(以位元組為機關)。此設定将限制生産者将在單個請求中發送的記錄批數,以避免發送大量請求。這實際上也是最大記錄批次大小的上限。請注意,伺服器對記錄批大小有自己的上限,該上限可能與此不同。

2、batch.size

batch.size 是 Kafka producer 非常重要的參數,它的值對 Producer 的吞吐量有着非常大的影響,因為我們知道,收集到一批消息再發送到 broker,比每條消息都請求一次 broker,性能會有顯著的提高,但 batch.size 設定得非常大又會給機器記憶體帶來極大的壓力,是以需要在項目中合理地增減 batch.size 值,才能提高 producer 的吞吐量。

org.apache.kafka.clients.producer.internals.RecordAccumulator#append

kafka topic數量上限_徹底搞懂 Kafka 消息大小相關參數設定的規則

以上,将消息追加到消息緩沖區時,會嘗試追加到一個 ProducerBatch,如果 ProducerBatch 滿了,就去緩存區申請 batch.size 大小的緩存建立一個新的 ProducerBatch 繼續追加消息。需要注意的是,如果消息大小本身就比 batch.size 大,這種情況每個 ProducerBatch 隻會包含一條消息。

最終 RecordAccumulator 緩存區看起來是這樣的:

kafka topic數量上限_徹底搞懂 Kafka 消息大小相關參數設定的規則

參見 2.2.x 版本的官方解釋:

The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.

No attempt will be made to batch records larger than this size.

Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.

A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.

翻譯如下:

每當将多個記錄發送到同一分區時,生産者将嘗試将記錄一起批處理成更少的請求。這有助于提高用戶端和伺服器的性能。此配置控制預設的批處理大小(以位元組為機關)。

不會嘗試批處理大于此大小的記錄。

發送給代理的請求将包含多個批次,每個分區一個,并包含可發送的資料。

較小的批處理量将使批處理變得不那麼普遍,并且可能會降低吞吐量(零的批處理量将完全禁用批處理)。非常大的批處理大小可能會浪費一些記憶體,因為我們總是在預期其他記錄時配置設定指定批處理大小的緩沖區。

那麼針對 max.request.size 、batch.size 之間大小的調優就尤其重要,通常來說,max.request.size 大于 batch.size,這樣每次發送消息通常會包含多個 ProducerBatch。

consumer

1、fetch.min.bytes、fetch.max.bytes、max.partition.fetch.bytes

1)fetch.max.bytes

參見 2.2.x 版本的官方解釋:

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via

message.max.bytes

(broker config) or

max.message.bytes

(topic config). Note that the consumer performs multiple fetches in parallel.

翻譯如下:

伺服器為擷取請求應傳回的最大資料量。使用者将批量擷取記錄,并且如果擷取的第一個非空分區中的第一個記錄批次大于此值,則仍将傳回記錄批次以確定使用者可以取得進展。是以,這不是絕對最大值。代理可接受的最大記錄批處理大小是通過“ message.max.bytes”(代理配置)或“ max.message.bytes”(主題配置)定義的。請注意,使用者并行執行多個提取。

fetch.min.bytes、max.partition.fetch.bytes 同理。

實戰測試

針對以上相關參數配置的解讀,還需要對 max.request.size、batch.size、message.max.bytes(或者 max.message.bytes)三個參數進一步驗證。

1、測試消息大于 max.request.size 是否會被攔截

設定:

使用 kafka-producer-perf-test.sh 腳本測試:

測試結果:

kafka topic數量上限_徹底搞懂 Kafka 消息大小相關參數設定的規則

可以得出結論,成功攔截了大于 max.request.size 的消息。

2、測試 max.message.bytes 參數用于校驗批次大小還是校驗消息大小

設定:

使用 kafka-producer-perf-test.sh 腳本測試:

測試結果:

kafka topic數量上限_徹底搞懂 Kafka 消息大小相關參數設定的規則

當 max.message.bytes = 2500 時:

kafka topic數量上限_徹底搞懂 Kafka 消息大小相關參數設定的規則

可以得出結論,max.message.bytes 參數校驗的是批次大小,而不是消息大小。

3、測試消息大小比 batch.size 還大的情況下,是否還會發送消息,當 max.message.bytes 參數小于消息大小時,是否會報錯

使用 kafka-producer-perf-test.sh 腳本測試:

測試結果:

kafka topic數量上限_徹底搞懂 Kafka 消息大小相關參數設定的規則

可以得出結論,即使消息大小比 batch.size 還大,依然會繼續發送消息。

當 max.message.bytes = 900 時:

kafka topic數量上限_徹底搞懂 Kafka 消息大小相關參數設定的規則

可以得出結論,即使 batch.size < max.message.bytes,但由于消息大小比 batch.size 大的情況下依然會發送消息,如果沒有 max.request.size 參數控制消息大小的話,就有可能會報錯。

這也說明了文章開頭為什麼直接修改 max.request.size 和 max.message.bytes 即可,而不需要調整 batch.size 的原因。

總結

從測試結果來看, max.request.size、batch.size、message.max.bytes(或者 max.message.bytes)三個參數都有一定的聯系,環環相扣,在實際的業務中還需要根據業務消息大小,給出适當的值,這對于 Kafka 叢集的吞吐量起着至關重要的作用。

本文基于 Kafka 2.2.x 版本

原創不易,如果對你有幫助,麻煩各位讀者幫忙點個「在看」或者轉發一下文章!

你的「在看」和「轉發」,是對我最大的認可!