天天看点

RocketMQ如何保证不丢失消息

作者:技术全栈12333

为了保证秒杀下单RocketMQ不丢失消息,可以采用以下方法:

  1. 消息持久化:在发送消息时,将消息标记为持久化,确保即使RocketMQ宕机,消息也不会丢失。
  2. 消息确认机制:在消费者消费消息后,向RocketMQ发送确认消息,告诉RocketMQ消息已经被消费。如果RocketMQ没有收到确认消息,它会认为消息没有被正确处理,并将其重新发送给其他消费者。

下面是一个简单的Java代码示例,演示如何在发送消息时将消息标记为持久化,并在消费消息后发送确认消息: 发送消息:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.producer.SendResult;
import java.nio.charset.StandardCharsets;
public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // 指定NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        producer.start();
        // 创建消息
        Message message = new Message("seckill_order", "Hello, World!".getBytes(StandardCharsets.UTF_8));
        // 将消息标记为持久化
        message.setDelayTimeLevel(3);
        // 发送消息
        SendResult result = producer.send(message);
        System.out.printf("Send message success. Message ID is: %s%n", result.getMsgId());
        // 关闭生产者
        producer.shutdown();
    }
}           

消费消息:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        // 指定NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅主题和标签
        consumer.subscribe("seckill_order", "*");
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                // 消费消息
                for (MessageExt message : messages) {
                    System.out.printf("Received message: %s%n", new String(message.getBody()));
                }
                // 发送确认消息
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }
}           

上述代码示例使用RocketMQ作为MQ实现。在发送消息时,我们使用 message.setDelayTimeLevel(3) 将消息标记为持久化。在消费消息后,我们使用 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 发送确认消息,告诉RocketMQ消息已经被消费。

RocketMQ如何保证不丢失消息

继续阅读