一、Kafka 核心 API
下圖是官方文檔中的一個圖,形象的描述了能與 Kafka內建的用戶端類型
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsQTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5SO0YTMyETOwMjN0EGN5YDNzYzX2ETOxgTMyEzLcBTMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
Kafka的五類用戶端API類型如下:
- AdminClient API:允許管理和檢測Topic、broker以及其他Kafka執行個體,與Kafka自帶的腳本指令作用類似。
- Producer API:釋出消息到1個或多個Topic,也就是生産者或者說釋出方需要用到的API。
- Consumer API:訂閱1個或多個Topic,并處理産生的消息,也就是消費者或者說訂閱方需要用到的API。
- Stream API:高效地将輸入流轉換到輸出流,通常應用在一些流處理場景。
- Connector API:從一些源系統或應用程式拉取資料到Kafka,如上圖中的DB。
本文中,我們将主要介紹 Producer API。
二、生産者用戶端的基本架構圖
由上圖可以看出:KafkaProducer有兩個基本線程。
Kafka 的 Producer 發送消息采用的是異步發送的方式。在消息發送的過程中,涉及到了
兩個線程——main 線程和 Sender 線程,以及一個線程共享變量——RecordAccumulator。
main 線程将消息發送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發送到 Kafka broker。
- 主線程:負責消息建立,攔截器,序列化器,分區器等操作,并将消息追加到消息收集器RecoderAccumulator中(這裡可以看出攔截器确實在序列化和分區之前執行)。
- 消息收集器主要的作用是緩存消息,讓發送線程可以批量發送,減少網絡傳輸資源消耗提升性能,緩存大小可以通過buffer.memory配置,預設值為32MB,如果生産者發送消息的速度超過發送到伺服器的速度,則send()方法要麼被阻塞,要麼抛出異常,取決于參數max.block.ms,預設值為60000ms
- 主線程發送的消息被追加到消息累加器的一個雙端隊列中,消息收集器RecoderAccumulator為每個分區都維護了一個 Deque<ProducerBatch> 類型的雙端隊列,隊列中是ProducerBatch,包含多個ProducerRecord。
- ProducerBatch 可以暫時了解為是 ProducerRecord 的集合,批量發送有利于提升吞吐量,降低網絡影響。
- 由于生産者用戶端使用 java.io.ByteBuffer 在發送消息之前進行消息儲存,并維護了一個 BufferPool 實作 ByteBuffer 的複用;該緩存池隻針對特定大小( batch.size 指定)的 ByteBuffer進行管理,對于消息過大的緩存,不能做到重複利用。
- 每次追加一條ProducerRecord消息,會尋找/建立對應的雙端隊列,從其尾部擷取一個ProducerBatch,判斷目前消息的大小是否可以寫入該批次中。若可以寫入則寫入;若不可以寫入,則建立一個ProducerBatch,判斷該消息大小是否超過用戶端參數配置 batch.size 的值,不超過,則以 batch.size建立新的ProducerBatch,這樣友善進行緩存重複利用;若超過,則以計算的消息大小建立對應的 ProducerBatch ,缺點就是該記憶體不能被複用了。
- Sender線程:
- 該線程從消息收集器擷取緩存的消息,将其處理為 <Node, List<ProducerBatch> 的形式, Node 表示叢集的broker節點。
- 進一步将<Node, List<ProducerBatch>轉化為<Node, Request>形式,此時才可以向服務端發送資料。
- 在發送之前,Sender線程将消息以 Map<NodeId, Deque<Request>> 的形式儲存到 InFlightRequests 中進行緩存,可以通過其擷取 leastLoadedNode ,即目前Node中負載壓力最小的一個,以實作消息的盡快發出。
相關參數:
- batch.size:隻有資料積累到 batch.size 之後,sender 才會發送資料。
- linger.ms:如果資料遲遲未達到 batch.size,sender 等待 linger.time 之後就會發送資料。
三、Producer API
3.1、導入相關依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
3.2、Producer異步發送示範
private static final String TOPIC_NAME = "yibo_topic";
/**
* Producer異步發送示範
*/
public static void producerSend(){
Properties properties = new Properties();
//kafka叢集
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//ack應答級别
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重試次數
properties.put(ProducerConfig.RETRIES_CONFIG,"3");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//等待時間
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
//RecoderAccumulator緩沖區大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
//key,value的序列化類
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//建立生産者對象
Producer<String,String> producer = new KafkaProducer<>(properties);
//消息對象 ProducerRecoder
for (int i = 0; i < 10; i++) {
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
producer.send(record);
}
producer.close();
}
3.3、Producer異步發送示範
回調函數會在 producer 收到 ack 時調用,為異步調用,該方法有兩個參數,分别是
RecordMetadata 和 Exception,如果 Exception 為 null,說明消息發送成功,如果
Exception 不為 null,說明消息發送失敗。
注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。
private static final String TOPIC_NAME = "yibo_topic";
/**
* Producer異步發送帶回調函數示範
*/
public static void producerSendWithCallback(){
Properties properties = new Properties();
//kafka叢集
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//ack應答級别
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重試次數
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//等待時間
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
//RecoderAccumulator緩沖區大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
//key,value的序列化類
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//建立生産者對象
Producer<String,String> producer = new KafkaProducer<>(properties);
//消息對象 ProducerRecoder
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
producer.send(record, new Callback() {
//回調函數,該方法會在 Producer 收到 ack 時調用,為異步調用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
} else {
log.error("exception",e);
}
}
});
producer.close();
}
3.4、Producer異步發送帶回調函數和Partition負載均衡
private static final String TOPIC_NAME = "yibo_topic";
/**
* Producer異步發送帶回調函數和Partition負載均衡
*/
public static void producerSendWithCallbackAndPartition(){
Properties properties = new Properties();
//kafka叢集
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//ack應答級别
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重試次數
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//等待時間
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
//RecoderAccumulator緩沖區大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
//key,value的序列化類
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//Partition負載均衡
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.yibo.kafka.producer.SamplePartition");
//建立生産者對象
Producer<String,String> producer = new KafkaProducer<>(properties);
//消息對象 ProducerRecoder
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
producer.send(record, new Callback() {
//回調函數,該方法會在 Producer 收到 ack 時調用,為異步調用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
} else {
log.error("exception",e);
}
}
});
producer.close();
}
/**
* @Description: Partitioner分區接口,以實作自定義的消息分區
*
* 預設分區器DefaultPartitioner org.apache.kafka.clients.producer.internals.DefaultPartitioner
*
* 如果消息的key為null,此時producer會使用預設的partitioner分區器将消息随機分布到topic的可用partition中。
* 如果key不為null,并且使用了預設的分區器,kafka會使用自己的hash算法對key取hash值,
* 使用hash值與partition數量取模,進而确定發送到哪個分區。
* 注意:此時key相同的消息會發送到相同的分區(隻要partition的數量不變化)
*/
public class SamplePartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
/**
*由于我們按key分區,在這裡我們規定:key值不允許為null。在實際項目中,key為null的消息*,可以發送到同一個分區。
*/
if(keyBytes == null) {
throw new InvalidRecordException("key cannot be null");
}
if(((String)key).equals("1")) {
return 1;
}
System.out.println("key: " + key);
//如果消息的key值不為1,那麼使用hash值取模,确定分區。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
3.5、Producer異步阻塞發送示範
由于 send 方法傳回的是一個 Future 對象,根據 Futrue 對象的特點,我們也可以實作同
步發送的效果,隻需在調用 Future 對象的 get 方發即可。
private static final String TOPIC_NAME = "yibo_topic";
/**
* Producer異步阻塞發送示範
*/
public static void producerSyncSend() throws Exception {
Properties properties = new Properties();
//kafka叢集
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//ack應答級别
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重試次數
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//等待時間
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
//RecoderAccumulator緩沖區大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
//key,value的序列化類
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//建立生産者對象
Producer<String,String> producer = new KafkaProducer<>(properties);
//消息對象 ProducerRecoder
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"hello","hello world");
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata = send.get();
log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
producer.close();
}
四、自定義 Interceptor
4.1、攔截器原理
Producer 攔截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于實作 clients 端的定制化控制邏輯。
對于 producer 而言,interceptor 使得使用者在消息發送前以及 producer 回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer 允許使用者指定多個interceptor按序作用于同一條消息進而形成一個攔截鍊(interceptor chain)。Intercetpor 的實作接口是
org.apache.kafka.clients.producer.ProducerInterceptor
,其定義的方法包括:
-
1、configure(configs):
擷取配置資訊和初始化資料時調用。
-
2、onSend(ProducerRecord):
該方法封裝進 KafkaProducer.send 方法中,即它運作在使用者主線程中。Producer 確定在消息被序列化以及計算分區前調用該方法。使用者可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的 topic 和分區,否則會影響目标分區的計算。
-
3、onAcknowledgement(RecordMetadata, Exception):
該方法會在消息從 RecordAccumulator 成功發送到 Kafka Broker 之後,或者在發送過程
中失敗時調用。并且通常都是在 producer 回調邏輯觸發之前。onAcknowledgement 運作在producer 的 IO 線程中,是以不要在該方法中放入很重的邏輯,否則會拖慢producer 的消息發送效率。
-
4、close:
關閉 interceptor,主要用于執行一些資源清理工作。
如前所述,interceptor 可能被運作在多個線程中,是以在具體實作時使用者需要自行確定
線程安全。另外倘若指定了多個 interceptor,則 producer 将按照指定順序調用它們,并僅僅是捕獲每個 interceptor 可能抛出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特别留意。
4.2、 攔截器案例
-
1、需求:
實作一個簡單的雙 interceptor 組成的攔截鍊。第一個 interceptor 會在消息發送前将時間
戳資訊加到消息 value 的最前部;第二個 interceptor 會在消息發送後更新成功發送消息數或失敗發送消息數。
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> map) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
// 建立一個新的 record,把時間戳寫入消息體的最前部
return new ProducerRecord(producerRecord.topic(),
producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(),
System.currentTimeMillis() + "," + producerRecord.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
@Override
public void close() {
}
}
- 2)統計發送消息成功和發送失敗消息數,并在 producer 關閉時列印這兩個計數器
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> map) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return null;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
// 統計成功和失敗的次數
if (e == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 儲存結果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
- 3)producer 主程式
public class InterceptorProducer {
private static final String TOPIC_NAME = "yibo_topic";
public static void main(String[] args) {
// 1 設定配置資訊
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2 建構攔截鍊
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yibo.kafka.producer.TimeInterceptor");
interceptors.add("com.yibo.kafka.producer.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Producer<String, String> producer = new KafkaProducer<>(props);
// 3 發送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "message" + i);
producer.send(record);
}
// 4 一定要關閉 producer,這樣才會調用 interceptor 的 close 方法
producer.close();
}
}
五、SpringBoot 內建 Kafka
5.1、添加maven依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.1</version>
</dependency>
5.2、配置 application.properties
# 指定kafka server的位址,叢集配多個,中間,逗号隔開
spring.kafka.bootstrap-servers=192.168.174.128:9092
#=============== provider =======================
# 寫入失敗時,重試次數。當leader節點失效,一個repli節點會替代成為leader節點,此時可能出現寫入失敗,
# 當retris為0時,produce不會重複。retirs重發,此時repli節點完全成為leader節點,不會産生消息丢失。
spring.kafka.producer.retries=0
# 每次批量發送消息的數量,produce積累到一定資料,一次發送
spring.kafka.producer.batch-size=16384
# produce積累資料一次發送,緩存大小達到buffer.memory就發送資料
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考慮完成請求之前收到的确認數,用于控制發送記錄在服務端的持久化,其值可以為如下:
#acks = 0 如果設定為零,則生産者将不會等待來自伺服器的任何确認,該記錄将立即添加到套接字緩沖區并視為已發送。在這種情況下,無法保證伺服器已收到記錄,并且重試配置将不會生效(因為用戶端通常不會知道任何故障),為每條記錄傳回的偏移量始終設定為-1。
#acks = 1 這意味着leader會将記錄寫入其本地日志,但無需等待所有副本伺服器的完全确認即可做出回應,在這種情況下,如果leader在确認記錄後立即失敗,但在将資料複制到所有的副本伺服器之前,則記錄将會丢失。
#acks = all 這意味着leader将等待完整的同步副本集以确認記錄,這保證了隻要至少一個同步副本伺服器仍然存活,記錄就不會丢失,這是最強有力的保證,這相當于acks = -1的設定。
#可以設定的值為:all, -1, 0, 1
spring.kafka.producer.acks=1
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定預設消費者group id --> 由于在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設定組名
spring.kafka.consumer.group-id=testGroup
# smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設定smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 設定自動送出offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'為true,則消費者偏移自動送出給Kafka的頻率(以毫秒為機關),預設值為5000。
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#=============== listener =======================
# 在偵聽器容器中運作的線程數。
spring.kafka.listener.concurrency=5
#listner負責ack,每調用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false
5.3、建立Producer
@Component
@Slf4j
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private static final String TOPIC_NAME = "yibo_topic";
public void send(Object obj) {
String obj2String = JSONObject.toJSONString(obj);
log.info("準備發送消息為:{}", obj2String);
//發送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, obj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
//發送失敗的處理
log.info(TOPIC_NAME + " - 生産者 發送消息失敗:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
//成功的處理
log.info(TOPIC_NAME + " - 生産者 發送消息成功:" + stringObjectSendResult.toString());
}
});
}
}