原理介紹
所謂的消息傳遞可靠性保障,是指 Kafka 對 Producer 和 Consumer 要處理的消息提供什麼樣的承諾。常見的承諾有以下三種:
- 最多一次(at most once):消息可能會丢失,但絕不會被重複發送。
- 至少一次(at least once):消息不會丢失,但有可能被重複發送。
- 精确一次(exactly once):消息不會丢失,也不會被重複發送。
其中at most once 和 at least once在發送端通過是否接收發送的結果來實作。
對于exactly once的情況,目前主要通過幂等性和事務實作。
建立Topic
版本要求:開源版本為2.2.0的專業版執行個體、Local存儲
幂等性 Producer
指定 Producer 幂等性的方法很簡單,僅需要設定一個參數即可,即 props.put(“enable.idempotence”, ture)
// 幂等參數設定
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 請求的最長等待時間
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
// 構造Producer對象,注意,該對象是線程安全的,一般來說,一個程序内一個Producer對象即可;
// 如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 設定建立的Kafka
String topic = "Local_Topic_Demo"; //消息所屬的Topic,請在控制台申請之後,填寫在這裡
try{
ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
Future future1 = producer.send(record1);
future1.get();//不關心是否發送成功,則不需要這行
ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
producer.send(record2);
Future future2 = producer.send(record1);
future2.get();//不關心是否發送成功,則不需要這行
ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
producer.send(record3);
Future future3 = producer.send(record1);
future3.get();//不關心是否發送成功,則不需要這行
} catch(Exception e) {
e.printStackTrace();//連接配接錯誤、No Leader錯誤都可以通過重試解決;消息太大這類錯誤kafkaProducer不會進行任何重試,直接抛出異常
}
目前的幂等性隻能保證單分區上,即一個幂等性 Producer 能夠保證某個主題的一個分區上不出現重複消息,它無法實作多個分區的幂等性。其次,它隻能實作單會話上的幂等性,不能實作跨會話的幂等性。這裡的會話,你可以了解為 Producer 程序的一次運作。當你重新開機了 Producer 程序之後,這種幂等性保證就喪失了。
事務性Producer
可以通過事務(transaction)或者依賴事務型 Producer,實作多分區以及多會話上的消息無重複,這也是幂等性 Producer 和事務型 Producer 的最大差別!
設定事務型 Producer 的方法:
- 和幂等性 Producer 一樣,開啟 enable.idempotence = true。
- 設定 Producer 端參數 transactional. id。最好為其設定一個有意義的名字。
// 幂等參數設定
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 事務支援設定
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"taro_transaction_id");
// Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 請求的最長等待時間
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
// 構造Producer對象,注意,該對象是線程安全的,一般來說,一個程序内一個Producer對象即可;
// 如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//消息所屬的Topic,請在控制台申請之後,填寫在這裡
String topic = "Local_Topic_Demo";
// 開啟事務
producer.initTransactions();
producer.beginTransaction();
try{
ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
producer.send(record1);
ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
producer.send(record2);
ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
producer.send(record3);
producer.commitTransaction();
} catch(Exception e) {
producer.abortTransaction();
e.printStackTrace();//連接配接錯誤、No Leader錯誤都可以通過重試解決;消息太大這類錯誤kafkaProducer不會進行任何重試,直接抛出異常
}
小結
幂等性 Producer 和事務型 Producer 都是 Kafka 社群力圖為 Kafka 實作精确一次處理語義所提供的工具,隻是它們的作用範圍是不同的。幂等性 Producer 隻能保證單分區、單會話上的消息幂等性;而事務能夠保證跨分區、跨會話間的幂等性。從傳遞語義上來看,自然是事務型 Producer 能做的更多。沒有免費的午餐,比起幂等性 Producer,事務型 Producer 的性能要更差,在實際使用過程中,我們需要仔細評估引入事務的開銷,切不可無腦地啟用事務。