天天看點

kafka——Producer API

一、Kafka 核心 API

下圖是官方文檔中的一個圖,形象的描述了能與 Kafka內建的用戶端類型

kafka——Producer API

Kafka的五類用戶端API類型如下:

  • AdminClient API:允許管理和檢測Topic、broker以及其他Kafka執行個體,與Kafka自帶的腳本指令作用類似。
  • Producer API:釋出消息到1個或多個Topic,也就是生産者或者說釋出方需要用到的API。
  • Consumer API:訂閱1個或多個Topic,并處理産生的消息,也就是消費者或者說訂閱方需要用到的API。
  • Stream API:高效地将輸入流轉換到輸出流,通常應用在一些流處理場景。
  • Connector API:從一些源系統或應用程式拉取資料到Kafka,如上圖中的DB。

本文中,我們将主要介紹 Producer API。

二、生産者用戶端的基本架構圖

kafka——Producer API

由上圖可以看出:KafkaProducer有兩個基本線程。

Kafka 的 Producer 發送消息采用的是異步發送的方式。在消息發送的過程中,涉及到了

兩個線程——main 線程和 Sender 線程,以及一個線程共享變量——RecordAccumulator。

main 線程将消息發送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發送到 Kafka broker。

  • 主線程:負責消息建立,攔截器,序列化器,分區器等操作,并将消息追加到消息收集器RecoderAccumulator中(這裡可以看出攔截器确實在序列化和分區之前執行)。
    • 消息收集器主要的作用是緩存消息,讓發送線程可以批量發送,減少網絡傳輸資源消耗提升性能,緩存大小可以通過buffer.memory配置,預設值為32MB,如果生産者發送消息的速度超過發送到伺服器的速度,則send()方法要麼被阻塞,要麼抛出異常,取決于參數max.block.ms,預設值為60000ms
    • 主線程發送的消息被追加到消息累加器的一個雙端隊列中,消息收集器RecoderAccumulator為每個分區都維護了一個 Deque<ProducerBatch> 類型的雙端隊列,隊列中是ProducerBatch,包含多個ProducerRecord。
    • ProducerBatch 可以暫時了解為是 ProducerRecord 的集合,批量發送有利于提升吞吐量,降低網絡影響。
    • 由于生産者用戶端使用 java.io.ByteBuffer 在發送消息之前進行消息儲存,并維護了一個 BufferPool 實作 ByteBuffer 的複用;該緩存池隻針對特定大小( batch.size 指定)的 ByteBuffer進行管理,對于消息過大的緩存,不能做到重複利用。
    • 每次追加一條ProducerRecord消息,會尋找/建立對應的雙端隊列,從其尾部擷取一個ProducerBatch,判斷目前消息的大小是否可以寫入該批次中。若可以寫入則寫入;若不可以寫入,則建立一個ProducerBatch,判斷該消息大小是否超過用戶端參數配置 batch.size 的值,不超過,則以 batch.size建立新的ProducerBatch,這樣友善進行緩存重複利用;若超過,則以計算的消息大小建立對應的 ProducerBatch ,缺點就是該記憶體不能被複用了。
  • Sender線程:
    • 該線程從消息收集器擷取緩存的消息,将其處理為 <Node, List<ProducerBatch> 的形式, Node 表示叢集的broker節點。
    • 進一步将<Node, List<ProducerBatch>轉化為<Node, Request>形式,此時才可以向服務端發送資料。
    • 在發送之前,Sender線程将消息以 Map<NodeId, Deque<Request>> 的形式儲存到 InFlightRequests 中進行緩存,可以通過其擷取 leastLoadedNode ,即目前Node中負載壓力最小的一個,以實作消息的盡快發出。

相關參數:

  • batch.size:隻有資料積累到 batch.size 之後,sender 才會發送資料。
  • linger.ms:如果資料遲遲未達到 batch.size,sender 等待 linger.time 之後就會發送資料。

三、Producer API

3.1、導入相關依賴

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.6.0</version>
</dependency>
           

3.2、Producer異步發送示範

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer異步發送示範
 */
public static void producerSend(){
	Properties properties = new Properties();
	//kafka叢集
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
	//ack應答級别
	properties.put(ProducerConfig.ACKS_CONFIG,"all");
	//重試次數
	properties.put(ProducerConfig.RETRIES_CONFIG,"3");
	//批次大小
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
	//等待時間
	properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
	//RecoderAccumulator緩沖區大小
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
	//key,value的序列化類
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	//建立生産者對象
	Producer<String,String> producer = new KafkaProducer<>(properties);
	//消息對象 ProducerRecoder
	for (int i = 0; i < 10; i++) {
		ProducerRecord<String,String> record =
				new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
		producer.send(record);
	}
	producer.close();
}
           

3.3、Producer異步發送示範

回調函數會在 producer 收到 ack 時調用,為異步調用,該方法有兩個參數,分别是

RecordMetadata 和 Exception,如果 Exception 為 null,說明消息發送成功,如果

Exception 不為 null,說明消息發送失敗。

注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer異步發送帶回調函數示範
 */
public static void producerSendWithCallback(){
	Properties properties = new Properties();
	//kafka叢集
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
	//ack應答級别
	properties.put(ProducerConfig.ACKS_CONFIG,"all");
	//重試次數
	properties.put(ProducerConfig.RETRIES_CONFIG,"0");
	//批次大小
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
	//等待時間
	properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
	//RecoderAccumulator緩沖區大小
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
	//key,value的序列化類
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	//建立生産者對象
	Producer<String,String> producer = new KafkaProducer<>(properties);
	//消息對象 ProducerRecoder
	ProducerRecord<String,String> record =
			new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
	producer.send(record, new Callback() {
		//回調函數,該方法會在 Producer 收到 ack 時調用,為異步調用
		@Override
		public void onCompletion(RecordMetadata recordMetadata, Exception e) {
			if (e == null) {
				log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
			} else {
				log.error("exception",e);
			}
		}
	});
	producer.close();
}
           

3.4、Producer異步發送帶回調函數和Partition負載均衡

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer異步發送帶回調函數和Partition負載均衡
 */
public static void producerSendWithCallbackAndPartition(){
	Properties properties = new Properties();
	//kafka叢集
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
	//ack應答級别
	properties.put(ProducerConfig.ACKS_CONFIG,"all");
	//重試次數
	properties.put(ProducerConfig.RETRIES_CONFIG,"0");
	//批次大小
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
	//等待時間
	properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
	//RecoderAccumulator緩沖區大小
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
	//key,value的序列化類
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	//Partition負載均衡
	properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.yibo.kafka.producer.SamplePartition");
	//建立生産者對象
	Producer<String,String> producer = new KafkaProducer<>(properties);
	//消息對象 ProducerRecoder
	ProducerRecord<String,String> record =
			new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
	producer.send(record, new Callback() {
		//回調函數,該方法會在 Producer 收到 ack 時調用,為異步調用
		@Override
		public void onCompletion(RecordMetadata recordMetadata, Exception e) {
			if (e == null) {
				log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
			} else {
				log.error("exception",e);
			}
		}
	});
	producer.close();
}

/**
 * @Description: Partitioner分區接口,以實作自定義的消息分區
 *
 * 預設分區器DefaultPartitioner org.apache.kafka.clients.producer.internals.DefaultPartitioner
 *
 * 如果消息的key為null,此時producer會使用預設的partitioner分區器将消息随機分布到topic的可用partition中。
 * 如果key不為null,并且使用了預設的分區器,kafka會使用自己的hash算法對key取hash值,
 * 使用hash值與partition數量取模,進而确定發送到哪個分區。
 * 注意:此時key相同的消息會發送到相同的分區(隻要partition的數量不變化)
 */
public class SamplePartition implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        /**
         *由于我們按key分區,在這裡我們規定:key值不允許為null。在實際項目中,key為null的消息*,可以發送到同一個分區。
         */
        if(keyBytes == null) {
            throw new InvalidRecordException("key cannot be null");
        }
        if(((String)key).equals("1")) {
            return 1;
        }
        System.out.println("key: " + key);
        //如果消息的key值不為1,那麼使用hash值取模,确定分區。
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
           

3.5、Producer異步阻塞發送示範

由于 send 方法傳回的是一個 Future 對象,根據 Futrue 對象的特點,我們也可以實作同

步發送的效果,隻需在調用 Future 對象的 get 方發即可。

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer異步阻塞發送示範
 */
public static void producerSyncSend() throws Exception {
	Properties properties = new Properties();
	//kafka叢集
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
	//ack應答級别
	properties.put(ProducerConfig.ACKS_CONFIG,"all");
	//重試次數
	properties.put(ProducerConfig.RETRIES_CONFIG,"0");
	//批次大小
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
	//等待時間
	properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
	//RecoderAccumulator緩沖區大小
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
	//key,value的序列化類
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	//建立生産者對象
	Producer<String,String> producer = new KafkaProducer<>(properties);
	//消息對象 ProducerRecoder
	ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"hello","hello world");
	Future<RecordMetadata> send = producer.send(record);
	RecordMetadata recordMetadata = send.get();
	log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
	producer.close();
}
           

四、自定義 Interceptor

4.1、攔截器原理

Producer 攔截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于實作 clients 端的定制化控制邏輯。

對于 producer 而言,interceptor 使得使用者在消息發送前以及 producer 回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer 允許使用者指定多個interceptor按序作用于同一條消息進而形成一個攔截鍊(interceptor chain)。Intercetpor 的實作接口是

org.apache.kafka.clients.producer.ProducerInterceptor

,其定義的方法包括:

  • 1、configure(configs):

    擷取配置資訊和初始化資料時調用。

  • 2、onSend(ProducerRecord):

    該方法封裝進 KafkaProducer.send 方法中,即它運作在使用者主線程中。Producer 確定在消息被序列化以及計算分區前調用該方法。使用者可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的 topic 和分區,否則會影響目标分區的計算。

  • 3、onAcknowledgement(RecordMetadata, Exception):

    該方法會在消息從 RecordAccumulator 成功發送到 Kafka Broker 之後,或者在發送過程

    中失敗時調用。并且通常都是在 producer 回調邏輯觸發之前。onAcknowledgement 運作在producer 的 IO 線程中,是以不要在該方法中放入很重的邏輯,否則會拖慢producer 的消息發送效率。

  • 4、close:

    關閉 interceptor,主要用于執行一些資源清理工作。

如前所述,interceptor 可能被運作在多個線程中,是以在具體實作時使用者需要自行確定

線程安全。另外倘若指定了多個 interceptor,則 producer 将按照指定順序調用它們,并僅僅是捕獲每個 interceptor 可能抛出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特别留意。

4.2、 攔截器案例

  • 1、需求:

    實作一個簡單的雙 interceptor 組成的攔截鍊。第一個 interceptor 會在消息發送前将時間

    戳資訊加到消息 value 的最前部;第二個 interceptor 會在消息發送後更新成功發送消息數或失敗發送消息數。

kafka——Producer API
public class TimeInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public void configure(Map<String, ?> map) {

    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        // 建立一個新的 record,把時間戳寫入消息體的最前部
        return new ProducerRecord(producerRecord.topic(),
                producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(),
                System.currentTimeMillis() + "," + producerRecord.value().toString());

    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    @Override
    public void close() {

    }
}
           
  • 2)統計發送消息成功和發送失敗消息數,并在 producer 關閉時列印這兩個計數器
public class CounterInterceptor implements ProducerInterceptor<String, String> {

    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public void configure(Map<String, ?> map) {

    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        return null;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        // 統計成功和失敗的次數
        if (e == null) {
            successCounter++;
        } else {
            errorCounter++;
        }

    }

    @Override
    public void close() {
        // 儲存結果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}
           
  • 3)producer 主程式
public class InterceptorProducer {

    private static final String TOPIC_NAME = "yibo_topic";

    public static void main(String[] args) {
        // 1 設定配置資訊
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2 建構攔截鍊
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.yibo.kafka.producer.TimeInterceptor");
        interceptors.add("com.yibo.kafka.producer.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        Producer<String, String> producer = new KafkaProducer<>(props);

        // 3 發送消息
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "message" + i);
            producer.send(record);
        }

        // 4 一定要關閉 producer,這樣才會調用 interceptor 的 close 方法
        producer.close();
    }
}
           

五、SpringBoot 內建 Kafka

5.1、添加maven依賴

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.6.1</version>
</dependency>
           

5.2、配置 application.properties

# 指定kafka server的位址,叢集配多個,中間,逗号隔開
spring.kafka.bootstrap-servers=192.168.174.128:9092

#=============== provider  =======================
# 寫入失敗時,重試次數。當leader節點失效,一個repli節點會替代成為leader節點,此時可能出現寫入失敗,
# 當retris為0時,produce不會重複。retirs重發,此時repli節點完全成為leader節點,不會産生消息丢失。
spring.kafka.producer.retries=0
# 每次批量發送消息的數量,produce積累到一定資料,一次發送
spring.kafka.producer.batch-size=16384
# produce積累資料一次發送,緩存大小達到buffer.memory就發送資料
spring.kafka.producer.buffer-memory=33554432

#procedure要求leader在考慮完成請求之前收到的确認數,用于控制發送記錄在服務端的持久化,其值可以為如下:
#acks = 0 如果設定為零,則生産者将不會等待來自伺服器的任何确認,該記錄将立即添加到套接字緩沖區并視為已發送。在這種情況下,無法保證伺服器已收到記錄,并且重試配置将不會生效(因為用戶端通常不會知道任何故障),為每條記錄傳回的偏移量始終設定為-1。
#acks = 1 這意味着leader會将記錄寫入其本地日志,但無需等待所有副本伺服器的完全确認即可做出回應,在這種情況下,如果leader在确認記錄後立即失敗,但在将資料複制到所有的副本伺服器之前,則記錄将會丢失。
#acks = all 這意味着leader将等待完整的同步副本集以确認記錄,這保證了隻要至少一個同步副本伺服器仍然存活,記錄就不會丢失,這是最強有力的保證,這相當于acks = -1的設定。
#可以設定的值為:all, -1, 0, 1
spring.kafka.producer.acks=1

# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定預設消費者group id --> 由于在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設定組名
spring.kafka.consumer.group-id=testGroup
# smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設定smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 設定自動送出offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'為true,則消費者偏移自動送出給Kafka的頻率(以毫秒為機關),預設值為5000。
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#=============== listener  =======================
# 在偵聽器容器中運作的線程數。
spring.kafka.listener.concurrency=5
#listner負責ack,每調用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false
           

5.3、建立Producer

@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    private static final String TOPIC_NAME = "yibo_topic";

    public void send(Object obj) {
        String obj2String = JSONObject.toJSONString(obj);
        log.info("準備發送消息為:{}", obj2String);
        //發送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //發送失敗的處理
                log.info(TOPIC_NAME + " - 生産者 發送消息失敗:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的處理
                log.info(TOPIC_NAME + " - 生産者 發送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }
}