天天看點

阿裡雲Kafka幂等生産者與事務生産者

原理介紹

所謂的消息傳遞可靠性保障,是指 Kafka 對 Producer 和 Consumer 要處理的消息提供什麼樣的承諾。常見的承諾有以下三種:
  • 最多一次(at most once):消息可能會丢失,但絕不會被重複發送。
  • 至少一次(at least once):消息不會丢失,但有可能被重複發送。
  • 精确一次(exactly once):消息不會丢失,也不會被重複發送。

其中at most once 和 at least once在發送端通過是否接收發送的結果來實作。

對于exactly once的情況,目前主要通過幂等性和事務實作。

建立Topic

版本要求:開源版本為2.2.0的專業版執行個體、Local存儲
阿裡雲Kafka幂等生産者與事務生産者

幂等性 Producer

指定 Producer 幂等性的方法很簡單,僅需要設定一個參數即可,即 props.put(“enable.idempotence”, ture)
// 幂等參數設定
  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 請求的最長等待時間
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
        // 構造Producer對象,注意,該對象是線程安全的,一般來說,一個程序内一個Producer對象即可;
        // 如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        // 設定建立的Kafka
        String topic = "Local_Topic_Demo"; //消息所屬的Topic,請在控制台申請之後,填寫在這裡
        
        try{
            ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
            Future future1 = producer.send(record1);
            future1.get();//不關心是否發送成功,則不需要這行

            ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
            producer.send(record2);
            Future future2 = producer.send(record1);
            future2.get();//不關心是否發送成功,則不需要這行

            ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
            producer.send(record3);
            Future future3 = producer.send(record1);
            future3.get();//不關心是否發送成功,則不需要這行

        } catch(Exception e) {
            e.printStackTrace();//連接配接錯誤、No Leader錯誤都可以通過重試解決;消息太大這類錯誤kafkaProducer不會進行任何重試,直接抛出異常
        }           
目前的幂等性隻能保證單分區上,即一個幂等性 Producer 能夠保證某個主題的一個分區上不出現重複消息,它無法實作多個分區的幂等性。其次,它隻能實作單會話上的幂等性,不能實作跨會話的幂等性。這裡的會話,你可以了解為 Producer 程序的一次運作。當你重新開機了 Producer 程序之後,這種幂等性保證就喪失了。

事務性Producer

可以通過事務(transaction)或者依賴事務型 Producer,實作多分區以及多會話上的消息無重複,這也是幂等性 Producer 和事務型 Producer 的最大差別!

設定事務型 Producer 的方法:

  • 和幂等性 Producer 一樣,開啟 enable.idempotence = true。
  • 設定 Producer 端參數 transactional. id。最好為其設定一個有意義的名字。
// 幂等參數設定
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // 事務支援設定
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"taro_transaction_id");
        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 請求的最長等待時間
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
        // 構造Producer對象,注意,該對象是線程安全的,一般來說,一個程序内一個Producer對象即可;
        // 如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //消息所屬的Topic,請在控制台申請之後,填寫在這裡
        String topic = "Local_Topic_Demo"; 

        // 開啟事務
        producer.initTransactions();
        producer.beginTransaction();
        try{
            ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
            producer.send(record1);

            ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
            producer.send(record2);

            ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
            producer.send(record3);

            producer.commitTransaction();
        } catch(Exception e) {
            producer.abortTransaction();
            e.printStackTrace();//連接配接錯誤、No Leader錯誤都可以通過重試解決;消息太大這類錯誤kafkaProducer不會進行任何重試,直接抛出異常
        }           

小結

幂等性 Producer 和事務型 Producer 都是 Kafka 社群力圖為 Kafka 實作精确一次處理語義所提供的工具,隻是它們的作用範圍是不同的。幂等性 Producer 隻能保證單分區、單會話上的消息幂等性;而事務能夠保證跨分區、跨會話間的幂等性。從傳遞語義上來看,自然是事務型 Producer 能做的更多。沒有免費的午餐,比起幂等性 Producer,事務型 Producer 的性能要更差,在實際使用過程中,我們需要仔細評估引入事務的開銷,切不可無腦地啟用事務。

參考連結

Kafka消息發送的三種模式 幂等生産者和事務生産者是一回事嗎?

繼續閱讀