世界上最快的捷徑,就是腳踏實地,本文已收錄【架構技術專欄】關注這個喜歡分享的地方。
開源項目:
- 分布式監控(Gitee GVP最有價值開源項目 ):https://gitee.com/sanjiankethree/cubic
- 個人主站:http://www.jiagoujishu.com
0、簡述
Spring Boot 版本:2.3.4.RELEASE
随着大資料的發展,目前Kafka可以說在我們項目中的使用是越來越多了。其高性能的特點也是滿足了我們大部分的場景,是以對于學習Kafka的相容使用也是一件很重要的事情。
下面我們從幾個點來說:
- 發送消息
- 發送回調
- 實作原理
- 異步和同步
1、添加依賴
2、添加配置
在Spring Boot 中kafka的配置屬性都是
spring.kafka.*
開頭的,最簡配置如下(application.properties中 )
3、發送代碼
Spring Boot kafka 依然沿用老套路 XXXTemplate,是以這裡發送自然就使用了
KafkaTemplate
4、參數說明
使用KafkaTemplate.send 會有很多不同的發送參數,這裡說明下:
- topic : 填寫要發送的topic名稱
- partition : 要發送的分區id,從0開始
- timestamp:時間戳
- key:消息的key
- data:消息資料
- ProducerRecord:消息的封裝類,包含了上面的參數
- Message> :Spring自帶的Message封裝類,包含消息和消息頭
5、發送回調
Spring Boot KafkaAutoConfiguration 為我們提供了處理消息回調的handler,以供我們來處理結果。成功調用onSuccess,失敗調用onError,增加如下類:
6、實作原理
從上面來看,我們基本兩三行代碼就完成了kafka消息的發送,那他們到底是怎麼加載實作的呢。
熟悉Spring Boot 的小夥伴想必也能猜到,基于其擴充SPI的機制,
spring-boot-autoconfigure
包下一定會有一個
KafkaAutoConfiguration
配置類。
1、KafkaAutoConfiguration
通過
KafkaAutoConfiguration
我們可以看出幾件事情
1、
@ConditionalOnClass(KafkaTemplate.class)
表示我們必須依賴了spring-kafka 包才會加載KafkaAutoConfiguration
2、此類配置了KafkaProperties屬性供我們使用
3、這裡在加載時幫我們引入了兩個類
KafkaAnnotationDrivenConfiguration
提供消費注解支援 和
KafkaStreamsAnnotationDrivenConfiguration
提供stream 注解支援
2、KafkaTemplate
這裡就不大段貼代碼了,可以去看完整的
KafkaAutoConfiguration
。
這裡摘取了KafkaTemplate 主要相關的方法,關鍵的幾個點如下:
- 三個方法使用了@ConditionalOnMissingBean 注解,根本原因就是為了友善我們進行擴充而存在的
- kafkaProducerListener 方法是為了在調用doSend 方法是建構Callback 使用的,友善我們來監控發送成功或失敗的資訊(KafkaTemplate 的305行buildCallback)
- ProducerFactory 是真正用于建立producer的,如果配置了 transactionIdPrefix 那就代表開啟了producer 對于事物的支援。如果開啟了事物那就會先從本地ThreadLocal 中擷取producer,拿不到才去建立。(KafkaTemplate 的 341 行 getTheProducer)
- 這裡如果配置了
還會建立一個spring.kafka.producer.transaction-id-prefix
事務管理器KafkaTransactionManager
加載流程:
1、因為我們加入
spring-kafka
jar,是以在啟動的時候會通過SPI 機制加載到
KafkaAutoConfiguration
2、這時配置類通過@ConditionalOnMissingBean 發現我們沒有獨立配置 KafkaTemplate時,會依次加載預設的
ProducerListener
和
ProducerFactory
來建構
KafkaTemplate
發送流程:
1、調用
KafkaTemplate.send(String topic, @Nullable V data)
方法
2、調用
KafkaTemplate
内部
doSend(ProducerRecord producerRecord)
方法
3、調用
KafkaTemplate
内部
getTheProducer()
方法,如果是事物發送就從Threadlocal 擷取,否則建立一個Producer
4、構造
SettableListenableFuture
回調
5、調用最終的發送方法
KafkaProducer
内的
doSend(ProducerRecord record, Callback callback)
7、異步和同步發送
通過
KafkaTemplate
的源碼我們可以發現,其實發送消息都是采用異步發送的。
KafkaTemplate
會把我們傳入的參數封裝成
ProducerRecord
,然後調用
doSend
方法,源碼如下:
而
doSend(producerRecord)
方法先檢查了下是否開啟事物,調用
this.getTheProducer()
擷取到 producer。
後面主要是構造了一個SettableListenableFuture 回調,最後在使用
KafkaProducer.send(ProducerRecord record, Callback callback)
進行資料發送,傳回一個
Future
同步發送消息
因為在某些業務場景下我需要同步發送消息,實作其實也很簡單。因為傳回了一個
Future
是以我們隻需要調用get方法就行了
往期推薦
Spring Boot 系列:日志動态配置詳解
Spring Boot 系列:最新版優雅停機詳解
(最新 9000字) Spring Boot 配置特性解析
Spring Boot 知識清單(一)SpringApplication
何時用多線程?多線程需要加鎖嗎?線程數多少最合理?
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5yN0E2M3QmZxQzN5MGOiVGO1QTM4MTO1QmY4MDOlZzYz8CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)