为了保证秒杀下单RocketMQ不丢失消息,可以采用以下方法:
- 消息持久化:在发送消息时,将消息标记为持久化,确保即使RocketMQ宕机,消息也不会丢失。
- 消息确认机制:在消费者消费消息后,向RocketMQ发送确认消息,告诉RocketMQ消息已经被消费。如果RocketMQ没有收到确认消息,它会认为消息没有被正确处理,并将其重新发送给其他消费者。
下面是一个简单的Java代码示例,演示如何在发送消息时将消息标记为持久化,并在消费消息后发送确认消息: 发送消息:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.producer.SendResult;
import java.nio.charset.StandardCharsets;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 指定NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
// 创建消息
Message message = new Message("seckill_order", "Hello, World!".getBytes(StandardCharsets.UTF_8));
// 将消息标记为持久化
message.setDelayTimeLevel(3);
// 发送消息
SendResult result = producer.send(message);
System.out.printf("Send message success. Message ID is: %s%n", result.getMsgId());
// 关闭生产者
producer.shutdown();
}
}
消费消息:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 指定NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅主题和标签
consumer.subscribe("seckill_order", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
// 消费消息
for (MessageExt message : messages) {
System.out.printf("Received message: %s%n", new String(message.getBody()));
}
// 发送确认消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
上述代码示例使用RocketMQ作为MQ实现。在发送消息时,我们使用 message.setDelayTimeLevel(3) 将消息标记为持久化。在消费消息后,我们使用 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 发送确认消息,告诉RocketMQ消息已经被消费。