RabbitMQ 進階特性 - 延遲隊列
- 延遲隊列,即消息進入隊列後不會立即被消費,隻有到達指定時間後,才會被消費。
- 需求:
- 下單後,30分鐘未支付,取消訂單,復原庫存。
- 新使用者注冊成功7天後,發送短信問候。
- 實作方式:
- 定時器
- 延遲隊列
- 很可惜,在RabbitMQ中并未提供延遲隊列功能。
- 但是可以使用:
組合實作延遲隊列的效果。TTL+死信隊列
- 步驟(基于RabbitMQ 進階特性 - 消息的可靠性投遞)
- 配置檔案(
)spring-rabbitmq-producer.xml
- 定義正常交換機(
)和隊列(order_exchange
)order_queue
<rabbit:queue id="order_queue" name="order_queue"> </rabbit:queue> <rabbit:topic-exchange name="order_exchange"> <rabbit:bindings> <rabbit:binding pattern="order.#" queue="order_queue"/> </rabbit:bindings> </rabbit:topic-exchange>
- 定義死信交換機(
)和隊列(order_exchange_dlx
)order_queue_dlx
<!--2. 定義死信交換機和隊列--> <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"/> <rabbit:topic-exchange name="order_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
- 設定正常隊列過期時間為30分鐘(
)和綁定死信交換機和隊列正常的隊列order_queue内
<rabbit:queue id="order_queue" name="order_queue"> <rabbit:queue-arguments> <!--3. 設定正常隊列過期時間為30分鐘--> <entry key="x-message-ttl" value="1000*60*30" value-type="java.lang.Integer"/> <!--4. 綁定死信交換機和隊列--> <entry key="x-dead-letter-exchange" value="order_exchange_dlx"/> <entry key="x-dead-letter-routing-key" value="dlx.order.xixi"/> </rabbit:queue-arguments> </rabbit:queue>
- 為了先把交換機和隊列建立出來随便執行以前的代碼(為了減少檢視時間,過期時間設定10秒)
- 準備發送的消息
@Test public void testDelay() throws InterruptedException { //1. 發送訂單消息,将來是在訂單系統中,下單成功後,發送消息 rabbitTemplate.convertAndSend("order_exchange","order.msg","訂單資訊:id=1,time=2021年4月5日01點42分"); //2.為了友善證明,列印倒計時 for (int i = 10; i > 0; i--) { System.out.println(i+"..."); Thread.sleep(1000); } }
- 準備消費者(基于RabbitMQ 進階特性 Consumer Ack)
- 建立監聽類
@Component public class OrderListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //1. 接受消息 System.out.println(new String(message.getBody(), StandardCharsets.UTF_8)); //2. 處理業務邏輯 列印一句話代表 System.out.println("處理業務邏輯"); System.out.println("根據訂單id查詢其狀态..."); System.out.println("判斷狀态是否為支付成功"); System.out.println("如果沒有支付,取消訂單,復原庫存...."); //3. 手動簽收 channel.basicAck(deliveryTag,true); } catch (IOException e) { System.out.println("出現異常,拒絕簽收"); //4. 拒絕簽收 channel.basicNack(deliveryTag,true,false); } } }
- 綁定監聽死信隊列(
)spring-rabbitmq-consumer.xml
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/> </rabbit:listener-container>
- 建立監聽類
- 執行結果(消費者先開啟,然後開啟生産者)
- 正常隊列一開始存在
- 10秒(
)後消失我們将30分鐘改了
- 執行死信
- 配置檔案(
- 小結
- 延遲隊列 指消息進入隊列後,可以被延遲一定時間,再進行消費。
- RabbitMQ沒有提供延遲隊列功能,但是可以使用 : TTL + DLX 來實作延遲隊列效果。