天天看點

kafka依賴_Spring Boot 系列 :關于Kafka消息發送的事兒

世界上最快的捷徑,就是腳踏實地,本文已收錄【架構技術專欄】關注這個喜歡分享的地方。

開源項目:

  • 分布式監控(Gitee GVP最有價值開源項目 ):https://gitee.com/sanjiankethree/cubic
  • 個人主站:http://www.jiagoujishu.com

0、簡述

Spring Boot 版本:2.3.4.RELEASE

随着大資料的發展,目前Kafka可以說在我們項目中的使用是越來越多了。其高性能的特點也是滿足了我們大部分的場景,是以對于學習Kafka的相容使用也是一件很重要的事情。

下面我們從幾個點來說:

  • 發送消息
  • 發送回調
  • 實作原理
  • 異步和同步

1、添加依賴

2、添加配置

在Spring Boot 中kafka的配置屬性都是

spring.kafka.*

 開頭的,最簡配置如下(application.properties中 )

3、發送代碼

Spring Boot kafka 依然沿用老套路 XXXTemplate,是以這裡發送自然就使用了

KafkaTemplate

4、參數說明

使用KafkaTemplate.send 會有很多不同的發送參數,這裡說明下:

  • topic : 填寫要發送的topic名稱
  • partition : 要發送的分區id,從0開始
  • timestamp:時間戳
  • key:消息的key
  • data:消息資料
  • ProducerRecord:消息的封裝類,包含了上面的參數
  • Message> :Spring自帶的Message封裝類,包含消息和消息頭

5、發送回調

Spring Boot KafkaAutoConfiguration 為我們提供了處理消息回調的handler,以供我們來處理結果。成功調用onSuccess,失敗調用onError,增加如下類:

6、實作原理

從上面來看,我們基本兩三行代碼就完成了kafka消息的發送,那他們到底是怎麼加載實作的呢。

熟悉Spring Boot 的小夥伴想必也能猜到,基于其擴充SPI的機制,

spring-boot-autoconfigure

包下一定會有一個

KafkaAutoConfiguration

配置類。

1、KafkaAutoConfiguration

通過

KafkaAutoConfiguration

我們可以看出幾件事情

1、

@ConditionalOnClass(KafkaTemplate.class)

 表示我們必須依賴了spring-kafka 包才會加載KafkaAutoConfiguration

2、此類配置了KafkaProperties屬性供我們使用

3、這裡在加載時幫我們引入了兩個類

KafkaAnnotationDrivenConfiguration

 提供消費注解支援 和

KafkaStreamsAnnotationDrivenConfiguration

 提供stream 注解支援

2、KafkaTemplate

這裡就不大段貼代碼了,可以去看完整的

KafkaAutoConfiguration

這裡摘取了KafkaTemplate 主要相關的方法,關鍵的幾個點如下:

  • 三個方法使用了@ConditionalOnMissingBean 注解,根本原因就是為了友善我們進行擴充而存在的
  • kafkaProducerListener 方法是為了在調用doSend 方法是建構Callback 使用的,友善我們來監控發送成功或失敗的資訊(KafkaTemplate 的305行buildCallback)
  • ProducerFactory 是真正用于建立producer的,如果配置了 transactionIdPrefix 那就代表開啟了producer 對于事物的支援。如果開啟了事物那就會先從本地ThreadLocal 中擷取producer,拿不到才去建立。(KafkaTemplate 的 341 行 getTheProducer)
  • 這裡如果配置了

    spring.kafka.producer.transaction-id-prefix

    還會建立一個

    KafkaTransactionManager

    事務管理器

加載流程:

1、因為我們加入

spring-kafka

 jar,是以在啟動的時候會通過SPI 機制加載到 

KafkaAutoConfiguration

2、這時配置類通過@ConditionalOnMissingBean 發現我們沒有獨立配置 KafkaTemplate時,會依次加載預設的

ProducerListener

ProducerFactory

來建構

KafkaTemplate

發送流程:

1、調用

KafkaTemplate.send(String topic, @Nullable V data)

方法

2、調用

KafkaTemplate

 内部 

doSend(ProducerRecord producerRecord)

方法

3、調用

KafkaTemplate

 内部 

getTheProducer()

 方法,如果是事物發送就從Threadlocal 擷取,否則建立一個Producer

4、構造

SettableListenableFuture

回調

5、調用最終的發送方法

KafkaProducer

 内的 

doSend(ProducerRecord record, Callback callback)

7、異步和同步發送

通過

KafkaTemplate

 的源碼我們可以發現,其實發送消息都是采用異步發送的。

KafkaTemplate

會把我們傳入的參數封裝成

ProducerRecord

,然後調用

doSend

方法,源碼如下:

doSend(producerRecord)

方法先檢查了下是否開啟事物,調用

this.getTheProducer()

擷取到 producer。

後面主要是構造了一個SettableListenableFuture 回調,最後在使用

KafkaProducer.send(ProducerRecord record, Callback callback)

 進行資料發送,傳回一個

Future

同步發送消息

因為在某些業務場景下我需要同步發送消息,實作其實也很簡單。因為傳回了一個

Future

 是以我們隻需要調用get方法就行了

往期推薦

Spring Boot 系列:日志動态配置詳解

Spring Boot 系列:最新版優雅停機詳解

(最新 9000字)  Spring Boot 配置特性解析

Spring Boot 知識清單(一)SpringApplication

何時用多線程?多線程需要加鎖嗎?線程數多少最合理?

kafka依賴_Spring Boot 系列 :關于Kafka消息發送的事兒