RabbitMQ 高级特性 - 延迟队列
- 延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
- 需求:
- 下单后,30分钟未支付,取消订单,回滚库存。
- 新用户注册成功7天后,发送短信问候。
- 实现方式:
- 定时器
- 延迟队列
- 很可惜,在RabbitMQ中并未提供延迟队列功能。
- 但是可以使用:
组合实现延迟队列的效果。TTL+死信队列
- 步骤(基于RabbitMQ 高级特性 - 消息的可靠性投递)
- 配置文件(
)spring-rabbitmq-producer.xml
- 定义正常交换机(
)和队列(order_exchange
)order_queue
<rabbit:queue id="order_queue" name="order_queue"> </rabbit:queue> <rabbit:topic-exchange name="order_exchange"> <rabbit:bindings> <rabbit:binding pattern="order.#" queue="order_queue"/> </rabbit:bindings> </rabbit:topic-exchange>
- 定义死信交换机(
)和队列(order_exchange_dlx
)order_queue_dlx
<!--2. 定义死信交换机和队列--> <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"/> <rabbit:topic-exchange name="order_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
- 设置正常队列过期时间为30分钟(
)和绑定死信交换机和队列正常的队列order_queue内
<rabbit:queue id="order_queue" name="order_queue"> <rabbit:queue-arguments> <!--3. 设置正常队列过期时间为30分钟--> <entry key="x-message-ttl" value="1000*60*30" value-type="java.lang.Integer"/> <!--4. 绑定死信交换机和队列--> <entry key="x-dead-letter-exchange" value="order_exchange_dlx"/> <entry key="x-dead-letter-routing-key" value="dlx.order.xixi"/> </rabbit:queue-arguments> </rabbit:queue>
- 为了先把交换机和队列创建出来随便执行以前的代码(为了减少查看时间,过期时间设置10秒)
- 准备发送的消息
@Test public void testDelay() throws InterruptedException { //1. 发送订单消息,将来是在订单系统中,下单成功后,发送消息 rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2021年4月5日01点42分"); //2.为了方便证明,打印倒计时 for (int i = 10; i > 0; i--) { System.out.println(i+"..."); Thread.sleep(1000); } }
- 准备消费者(基于RabbitMQ 高级特性 Consumer Ack)
- 创建监听类
@Component public class OrderListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //1. 接受消息 System.out.println(new String(message.getBody(), StandardCharsets.UTF_8)); //2. 处理业务逻辑 打印一句话代表 System.out.println("处理业务逻辑"); System.out.println("根据订单id查询其状态..."); System.out.println("判断状态是否为支付成功"); System.out.println("如果没有支付,取消订单,回滚库存...."); //3. 手动签收 channel.basicAck(deliveryTag,true); } catch (IOException e) { System.out.println("出现异常,拒绝签收"); //4. 拒绝签收 channel.basicNack(deliveryTag,true,false); } } }
- 绑定监听死信队列(
)spring-rabbitmq-consumer.xml
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/> </rabbit:listener-container>
- 创建监听类
- 执行结果(消费者先开启,然后开启生产者)
- 正常队列一开始存在
- 10秒(
)后消失我们将30分钟改了
- 执行死信
- 配置文件(
- 小结
- 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
- RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。