天天看點

【RabbitMq04】延遲隊列

一、延遲隊列了解

1、延遲隊列概念

延時隊列,隊列内部是有序的,最重要的特性就展現在它的延時屬性上,延時隊列中的元素是希望在指定時間到了以後或之前取出和處理,簡單來說,延時隊列就是用來存放需要在指定時間被處理的元素的隊列

延遲隊列,就是死信隊列中,TTL消息過期的一種

舉例

如死信隊列文章中所講,消費者1啟動後,在rabbitmq控制台生成正常隊列和死信隊列後就又關閉了,然後開啟生産者給消費者1發送消息,因為消費者1中設定了TTL消息過期機制,是以等了10秒種消息就自動進入到死信隊列了,消息被消費者2接收,這對于生産者和消費者2來說,就是延遲了10秒鐘。

2、延遲隊列使用場景

1.訂單在十分鐘之内未支付則自動取消

2.新建立的店鋪,如果在十天内都沒有上傳過商品,則自動發送消息提醒。

3.使用者注冊成功後,如果三天内沒有登陸則進行短信提醒。

4.使用者發起退款,如果三天内沒有得到處理則通知相關營運人員。

5.預定會議後,需要在預定的時間點前十分鐘通知各個與會人員參加會議

6.未付款訂單,超過30分鐘未付款,自動取消訂單并釋放庫存

這些場景都有一個特點,需要在某個事件發生之後或者之前的指定時間點完成某一項任務,如:發生訂單生成事件,在十分鐘之後檢查該訂單支付狀态,然後将未支付的訂單進行關閉;看起來似乎使用定時任務,一直輪詢資料,每秒查一次,取出需要被處理的資料,然後處理不就完事了嗎?如果資料量比較少,确實可以這樣做,比如:對于“如果賬單一周内未支付則進行自動結算”這樣的需求,如果對于時間不是嚴格限制,而是寬松意義上的一周,那麼每天晚上跑個定時任務檢查一下所有未支付的賬單,确實也是一個可行的方案。但對于資料量比較大,并且時效性較強的場景,如:“訂單十分鐘内未支付則關閉“,短期内未支付的訂單資料可能會有很多,活動期間甚至會達到百萬甚至千萬級别,對這麼龐大的資料量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒内無法完成所有訂單的檢查,同時會給資料庫帶來很大壓力,無法滿足業務要求而且性能低下。

【RabbitMq04】延遲隊列

3、RabbitMQ 中的 TTL

TTL 是什麼呢?TTL 是 RabbitMQ 中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有消息的最大存活時間,機關是毫秒。換句話說,如果一條消息設定了 TTL 屬性或者進入了設定 TTL 屬性的隊列,那麼這條消息如果在 TTL 設定的時間内沒有被消費,則會成為"死信"。如果同時配置了隊列的 TTL 和消息的TTL,那麼較小的那個值将會被使用,有兩種方式設定 TTL。

消息設定 TTL

針對每條消息設定 TTL

【RabbitMq04】延遲隊列
AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000")
                .build();
           

隊列設定 TTL

建立隊列的時候設定隊列的“x-message-ttl”屬性

【RabbitMq04】延遲隊列

兩者的差別

如果設定了隊列的 TTL 屬性,那麼一旦消息過期,就會被隊列丢棄(如果配置了死信隊列被丢到死信隊列中),而第二種方式,消息即使過期,也不一定會被馬上丢棄,因為消息是否過期是在即将投遞到消費者之前判定的,如果目前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間;另外,還需要注意的一點是,如果不設定 TTL,表示消息永遠不會過期,如果将 TTL 設定為 0,則表示除非此時可以直接投遞該消息到消費者,否則該消息将會被丢棄。

前一小節我們介紹了

死信隊列

,剛剛又介紹了

TTL

,至此利用 RabbitMQ

實作延時隊列

的兩大要素已經集齊,接下來隻需要将它們進行融合,再加入一點點調味料,延時隊列就可以新鮮出爐了。想想看,延時隊列,不就是想要消息延遲多久被處理嗎,TTL 則剛好能讓消息在延遲多久之後成為死信,另一方面,成為死信的消息都會被投遞到死信隊列裡,這樣隻需要消費者一直消費死信隊列裡的消息就完事了,因為裡面的消息都是希望被立即處理的消息

二、延遲隊列實踐演練

1、整合 springboot

導入依賴

<!--RabbitMQ 依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--可以将 Java 對象轉換為 JSON 格式,當然它也可以将 JSON 字元串轉換為 Java 對象-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ 測試依賴-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
           

修改配置檔案

spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
           

添加 Swagger 配置類

package com.lian.rabbitmq.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@EnableSwagger2
@Configuration
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig() {
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }

    private ApiInfo webApiInfo() {
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文檔")
                .description("本文檔描述了 rabbitmq 微服務接口定義")
                .version("1.0")
                .contact(new Contact("muzhen", "http://lian.com",
                        "[email protected]"))
                .build();
    }
}
           

2、隊列 TTL

a、代碼架構圖

建立兩個正常隊列 QA 和 QB,兩者隊列 TTL 分别設定為 10S 和 40S,然後在建立一個正常交換機 X 和死信交換機 Y,它們的類型都是 direct,建立一個死信隊列 QD,它們的綁定關系如下:

【RabbitMq04】延遲隊列

死信交換機 Y 用 路由key為YD的routingKey 和正常隊列AQ和QB 綁定

2個交換機:X 正常交換機 ,Y死信交換機

3個隊列:QA正常隊列、QB正常隊列、QD死信隊列

綁定關系:X正常交換機和QA\QB綁定,Y死信交換機和QD綁定

b、配置檔案類代碼

package com.lian.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig {

    //普通交換機的名稱
    public final static String X_EXCHANGE = "X";
    //死信交換機的名稱
    public final static String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通隊列的名稱
    public final static String QUEUE_A = "QA";
    public final static String QUEUE_B = "QB";
    //死信隊列的名稱
    public final static String DEAD_LETTER_QUEUE = "QD";

    //聲明交換機 xExchange 别名
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    //聲明交換機 yExchange 别名
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //聲明普通隊列 TTL 為10s
    @Bean("queueA")
    public Queue queueA(){
        //正常隊列綁定死信交換機資訊,因為正常隊列消息會變為死信
        Map<String, Object> arguments = new HashMap<>();
        //正常隊列設定死信交換機 參數 key 是固定值
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //正常隊列設定死信 routing-key 參數 key 是固定值
        arguments.put("x-dead-letter-routing-key", "YD");
        //設定過期時間 10s,機關是ms,可以在消費者正常隊列處設定,也可以在生産者發送處設定
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }
    //聲明普通隊列 TTL 為40s
    @Bean("queueB")
    public Queue queueB(){
        //正常隊列綁定死信交換機資訊,因為正常隊列消息會變為死信
        Map<String, Object> arguments = new HashMap<>();
        //正常隊列設定死信交換機 參數 key 是固定值
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //正常隊列設定死信 routing-key 參數 key 是固定值
        arguments.put("x-dead-letter-routing-key", "YD");
        //設定過期時間 40s,機關是ms,可以在消費者正常隊列處設定,也可以在生産者發送處設定
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }
    //聲明死信隊列
    @Bean("queueD")
    public Queue queueD(){
        //沒有參數
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    //五大元件完畢,開始互相綁定
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        //将 X 正常交換機和 QA 隊列 綁定
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        //将 X 正常交換機和 QB 隊列 綁定
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean
    public Binding queueABindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange){
        //将 Y 死信交換機和 QD 死信隊列 綁定
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}
           

c、生産者

package com.lian.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 發送延遲消息
 */
@Slf4j
@RequestMapping("/ttl")
@RestController
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("目前時間:{},發送一條資訊給兩個 TTL 隊列:{}", new Date(), message);
        rabbitTemplate.convertAndSend("X","XA","消息來自ttl為10s的隊列"+message);
        rabbitTemplate.convertAndSend("X","XB","消息來自ttl為40s的隊列"+message);
    }
}
           

d、消費者

package com.lian.rabbitmq.config;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = "QD")
    public void received(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        log.info("目前時間:{},收到死信隊列資訊{}", new Date().toString(), msg);
    }

}
           

測試

發起一個請求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻

傳回結果

2021-07-29 19:20:09.641  INFO 15732 --- [nio-8080-exec-1] c.l.rabbitmq.config.SendMsgController    : 目前時間:Thu Jul 29 19:20:09 CST 2021,發送一條資訊給兩個 TTL 隊列:嘻嘻嘻
2021-07-29 19:20:19.893  INFO 15732 --- [ntContainer#0-1] c.l.r.config.DeadLetterQueueConsumer     : 目前時間:Thu Jul 29 19:20:19 CST 2021,收到死信隊列資訊消息來自ttl為10s的隊列嘻嘻嘻
2021-07-29 19:20:49.756  INFO 15732 --- [ntContainer#0-1] c.l.r.config.DeadLetterQueueConsumer     : 目前時間:Thu Jul 29 19:20:49 CST 2021,收到死信隊列資訊消息來自ttl為40s的隊列嘻嘻嘻
           

第一條消息在 10S 後變成了死信消息,然後被消費者消費掉,第二條消息在 40S 之後變成了死信消息,然後被消費掉,這樣一個延時隊列就打造完成了。不過,如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個隊列,這裡隻有 10S 和 40S兩個時間選項,如果需要一個小時後處理,那麼就需要增加 TTL 為一個小時的隊列,如果是預定會議室然後提前通知這樣的場景,豈不是要增加無數個隊列才能滿足需求?

3、延時隊列優化

在這裡新增了一個隊列 QC,綁定關系如下,該隊列不設定 TTL 時間

【RabbitMq04】延遲隊列

a、消費者

package com.lian.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig2 {

    //普通交換機的名稱
    public final static String X_EXCHANGE = "X";
    //死信交換機的名稱
    public final static String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通隊列的名稱
    public final static String QUEUE_A = "QA";
    public final static String QUEUE_B = "QB";
    public final static String QUEUE_C = "QC";
    //死信隊列的名稱
    public final static String DEAD_LETTER_QUEUE = "QD";

    //聲明交換機 xExchange 别名
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    //聲明交換機 yExchange 别名
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //聲明普通隊列 TTL 為10s
    @Bean("queueA")
    public Queue queueA(){
        //正常隊列綁定死信交換機資訊,因為正常隊列消息會變為死信
        Map<String, Object> arguments = new HashMap<>();
        //正常隊列設定死信交換機 參數 key 是固定值
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //正常隊列設定死信 routing-key 參數 key 是固定值
        arguments.put("x-dead-letter-routing-key", "YD");
        //設定過期時間 10s,機關是ms,可以在消費者正常隊列處設定,也可以在生産者發送處設定
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }
    //聲明普通隊列 TTL 為40s
    @Bean("queueB")
    public Queue queueB(){
        //正常隊列綁定死信交換機資訊,因為正常隊列消息會變為死信
        Map<String, Object> arguments = new HashMap<>();
        //正常隊列設定死信交換機 參數 key 是固定值
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //正常隊列設定死信 routing-key 參數 key 是固定值
        arguments.put("x-dead-letter-routing-key", "YD");
        //設定過期時間 40s,機關是ms,可以在消費者正常隊列處設定,也可以在生産者發送處設定
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }
    //聲明普通隊列 TTL 為40s
    @Bean("queueC")
    public Queue queueC(){
        //正常隊列綁定死信交換機資訊,因為正常隊列消息會變為死信
        Map<String, Object> arguments = new HashMap<>();
        //正常隊列設定死信交換機 參數 key 是固定值
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //正常隊列設定死信 routing-key 參數 key 是固定值
        arguments.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
    //聲明死信隊列
    @Bean("queueD")
    public Queue queueD(){
        //沒有參數
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    //五大元件完畢,開始互相綁定
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        //将 X 正常交換機和 QA 隊列 綁定
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        //将 X 正常交換機和 QB 隊列 綁定
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        //将 X 正常交換機和 QB 隊列 綁定
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
    @Bean
    public Binding queueABindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange){
        //将 Y 死信交換機和 QD 死信隊列 綁定
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}
           

b、生産者

package com.lian.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 發送延遲消息
 */
@Slf4j
@RequestMapping("/ttl")
@RestController
public class SendMsgController2 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
        log.info("目前時間:{},發送一條時長{}毫秒ttl資訊給隊列", new Date().toString(), message, ttlTime);
        rabbitTemplate.convertAndSend("X","XA","消息來自ttl為10s的隊列"+message);
        rabbitTemplate.convertAndSend("X","XC",message,msg -> {
            //發送消息的時候,延遲時長
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }
}
           

c、測試

發起請求

http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000

http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000

按道理将應該是發送2秒的隊列消息先被消費,可是 20秒的隊列卻先被消費了,2秒隊列并沒有按時死亡

【RabbitMq04】延遲隊列

看起來似乎沒什麼問題,但是在最開始的時候,就介紹過如果使用在消息屬性上設定 TTL 的方式,消息可能并不會按時“死亡“,因為 RabbitMQ 隻會檢查第一個消息是否過期,如果過期則丢到死信隊列,如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息并不會優先得到執行。

4、Rabbitmq 插件實作延遲隊列

上文中提到的問題,确實是一個問題,如果不能實作在消息粒度上的 TTL,并使其在設定的 TTL 時間及時死亡,就無法設計成一個通用的延時隊列。那如何解決呢,接下來我們就去解決該問題。

安裝延時隊列插件

在官網上下載下傳 https://www.rabbitmq.com/community-plugins.html,下載下傳

rabbitmq_delayed_message_exchange 插件,然後解壓放置到 RabbitMQ 的插件目錄。進入 RabbitMQ 的安裝目錄下的 plgins 目錄,執行下面指令讓該插件生效,然後重新開機 RabbitMQ

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
           

配置檔案類代碼

在我們自定義的交換機中,這是一種新的交換類型,該類型消息支援延遲投遞機制 消息傳遞後并不會立即投遞到目标隊列中,而是存儲在 mnesia(一個分布式資料系統)表中,當達到投遞時間時,才投遞到目标隊列中

配置類

@Configuration
public class DelayedQueueConfig {
 public static final String DELAYED_QUEUE_NAME = "delayed.queue";
 public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
 public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
 @Bean
 public Queue delayedQueue() {
 return new Queue(DELAYED_QUEUE_NAME);
 }
 //自定義交換機 我們在這裡定義的是一個延遲交換機
 @Bean
 public CustomExchange delayedExchange() {
 Map<String, Object> args = new HashMap<>();
 //自定義交換機的類型
 args.put("x-delayed-type", "direct");
 return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, 
args);
 }
 @Bean
 public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
 @Qualifier("delayedExchange") CustomExchange 
delayedExchange) {
 return 
BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
 } }
           

消息生産者

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
 rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, 
correlationData ->{
 correlationData.getMessageProperties().setDelay(delayTime);
 return correlationData;
 });
 log.info(" 當 前 時 間 : {}, 發送一條延遲 {} 毫秒的資訊給隊列 delayed.queue:{}", new 
Date(),delayTime, message);
}
           

消息消費者

public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
 String msg = new String(message.getBody());
 log.info("目前時間:{},收到延時隊列的消息:{}", new Date().toString(), msg);
}
           

發起請求:

http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000

http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000

【RabbitMq04】延遲隊列

第二個消息被先消費掉了,符合預期

總結

延時隊列在需要延時處理的場景下非常有用,使用 RabbitMQ 來實作延時隊列可以很好的利用

RabbitMQ 的特性,如:消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正确處理的消息不會被丢棄。另外,通過 RabbitMQ 叢集的特性,可以很好的解決單點故障問題,不會因為單個節點挂掉導緻延時隊列不可用或者消息丢失。

當然,延時隊列還有很多其它選擇,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz

或者利用 kafka 的時間輪,這些方式各有特點,看需要适用的場景