天天看点

阿里云Kafka幂等生产者与事务生产者

原理介绍

所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:
  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

其中at most once 和 at least once在发送端通过是否接收发送的结果来实现。

对于exactly once的情况,目前主要通过幂等性和事务实现。

创建Topic

版本要求:开源版本为2.2.0的专业版实例、Local存储
阿里云Kafka幂等生产者与事务生产者

幂等性 Producer

指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture)
// 幂等参数设置
  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
        // 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
        // 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        // 设置创建的Kafka
        String topic = "Local_Topic_Demo"; //消息所属的Topic,请在控制台申请之后,填写在这里
        
        try{
            ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
            Future future1 = producer.send(record1);
            future1.get();//不关心是否发送成功,则不需要这行

            ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
            producer.send(record2);
            Future future2 = producer.send(record1);
            future2.get();//不关心是否发送成功,则不需要这行

            ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
            producer.send(record3);
            Future future3 = producer.send(record1);
            future3.get();//不关心是否发送成功,则不需要这行

        } catch(Exception e) {
            e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
        }           
当前的幂等性只能保证单分区上,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。

事务性Producer

可以通过事务(transaction)或者依赖事务型 Producer,实现多分区以及多会话上的消息无重复,这也是幂等性 Producer 和事务型 Producer 的最大区别!

设置事务型 Producer 的方法:

  • 和幂等性 Producer 一样,开启 enable.idempotence = true。
  • 设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。
// 幂等参数设置
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // 事务支持设置
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"taro_transaction_id");
        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
        // 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
        // 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //消息所属的Topic,请在控制台申请之后,填写在这里
        String topic = "Local_Topic_Demo"; 

        // 开启事务
        producer.initTransactions();
        producer.beginTransaction();
        try{
            ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
            producer.send(record1);

            ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
            producer.send(record2);

            ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
            producer.send(record3);

            producer.commitTransaction();
        } catch(Exception e) {
            producer.abortTransaction();
            e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
        }           

小结

幂等性 Producer 和事务型 Producer 都是 Kafka 社区力图为 Kafka 实现精确一次处理语义所提供的工具,只是它们的作用范围是不同的。幂等性 Producer 只能保证单分区、单会话上的消息幂等性;而事务能够保证跨分区、跨会话间的幂等性。从交付语义上来看,自然是事务型 Producer 能做的更多。没有免费的午餐,比起幂等性 Producer,事务型 Producer 的性能要更差,在实际使用过程中,我们需要仔细评估引入事务的开销,切不可无脑地启用事务。

参考链接

Kafka消息发送的三种模式 幂等生产者和事务生产者是一回事吗?

继续阅读