天天看點

RabbitMQ的死信隊列機制一、死信隊列二、死信隊列的實作

一、死信隊列

死信指的是無法被消費的消息,由于消息TTL過期、隊列達到最大長度、消息被拒絕等原因,導緻隊列中一些消息無法被消費,這樣的消息如果沒有進行後續的處理,就會變成死信。為了保證消息資料不丢失,需要使用死信隊列機制。

二、死信隊列的實作

RabbitMQ的死信隊列機制一、死信隊列二、死信隊列的實作

1、消息

TTL

過期

普通隊列消費者

Consumer01

public class Consumer01 {

    //不同交換機
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    //死信交換機
    public static final String DEAD_EXCHANGE = "dead_exchange";

    //普通隊列
    public static final String NORMAL_QUEUE = "normal_queue";

    //死信隊列
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //聲明普通交換機和死信交換機
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //聲明普通隊列
        Map<String, Object> arguments = new HashMap<>();
        //arguments.put("x-message-ttl","10000"); //設定TTL過期時間為10s
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);  //給普通隊列設定死信交換機
        arguments.put("x-dead-letter-routing-key", "lisi");  //設定死信交換機的routingKey
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        //聲明死信隊列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        //綁定普通隊列和普通交換機
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        //綁定死信隊列和死信交換機
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("Consumer01等待接收消息......");

        //聲明接收消息回調函數 和 取消消息消費時的回調函數
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("Consumer01接收消息:" + msg);
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("取消消息消費");
        };

        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);
    }
}
           

死信隊列消費者

Consumer02

public class Consumer02 {

    //死信隊列
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("Consumer02等待接收消息......");

        //聲明接收消息回調函數 和 取消消息消費時的回調函數
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("Consumer02接收消息:" + msg);
        };

        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("取消消息消費");
        };

        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
    }
}
           

生産者設定

TTL

消息過期時間為10s

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //設定TTL消息過期時間
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for (int i = 0; i <= 10; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes("UTF-8"));
        }
    }
}
           

在普通隊列挂掉了的情況下,由于設定TTL為10s,逾時之後消息會通過死信交換機被發送到死信隊列

RabbitMQ的死信隊列機制一、死信隊列二、死信隊列的實作

死信隊列消費者會接收了死信隊列的消息

RabbitMQ的死信隊列機制一、死信隊列二、死信隊列的實作

2、隊列達到最大長度

普通隊列長度設定為5,超過最大長度的消息會被放到死信隊列中

RabbitMQ的死信隊列機制一、死信隊列二、死信隊列的實作

生産者一共發送了11條消息,5條消息在普通隊列,6條消息在死信隊列

RabbitMQ的死信隊列機制一、死信隊列二、死信隊列的實作

3、消息被拒絕

如果普通消費者拒絕掉普通隊列裡面的消息,消息也會被放到死信隊列中

RabbitMQ的死信隊列機制一、死信隊列二、死信隊列的實作

普通消費者拒絕的消息被轉發到死信隊列

RabbitMQ的死信隊列機制一、死信隊列二、死信隊列的實作
RabbitMQ的死信隊列機制一、死信隊列二、死信隊列的實作

拒絕的消息被死信消費者接收

RabbitMQ的死信隊列機制一、死信隊列二、死信隊列的實作