一、死信隊列
死信指的是無法被消費的消息,由于消息TTL過期、隊列達到最大長度、消息被拒絕等原因,導緻隊列中一些消息無法被消費,這樣的消息如果沒有進行後續的處理,就會變成死信。為了保證消息資料不丢失,需要使用死信隊列機制。
二、死信隊列的實作
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL5lFRNBza65UNNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL5YzNyETM0gDM0IjNwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
1、消息 TTL
過期
TTL
普通隊列消費者
Consumer01
public class Consumer01 {
//不同交換機
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交換機
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通隊列
public static final String NORMAL_QUEUE = "normal_queue";
//死信隊列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//聲明普通交換機和死信交換機
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//聲明普通隊列
Map<String, Object> arguments = new HashMap<>();
//arguments.put("x-message-ttl","10000"); //設定TTL過期時間為10s
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //給普通隊列設定死信交換機
arguments.put("x-dead-letter-routing-key", "lisi"); //設定死信交換機的routingKey
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
//聲明死信隊列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//綁定普通隊列和普通交換機
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
//綁定死信隊列和死信交換機
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("Consumer01等待接收消息......");
//聲明接收消息回調函數 和 取消消息消費時的回調函數
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("Consumer01接收消息:" + msg);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("取消消息消費");
};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);
}
}
死信隊列消費者
Consumer02
public class Consumer02 {
//死信隊列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Consumer02等待接收消息......");
//聲明接收消息回調函數 和 取消消息消費時的回調函數
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("Consumer02接收消息:" + msg);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("取消消息消費");
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
}
}
生産者設定
TTL
消息過期時間為10s
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//設定TTL消息過期時間
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i <= 10; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes("UTF-8"));
}
}
}
在普通隊列挂掉了的情況下,由于設定TTL為10s,逾時之後消息會通過死信交換機被發送到死信隊列
死信隊列消費者會接收了死信隊列的消息
2、隊列達到最大長度
普通隊列長度設定為5,超過最大長度的消息會被放到死信隊列中
生産者一共發送了11條消息,5條消息在普通隊列,6條消息在死信隊列
3、消息被拒絕
如果普通消費者拒絕掉普通隊列裡面的消息,消息也會被放到死信隊列中
普通消費者拒絕的消息被轉發到死信隊列
拒絕的消息被死信消費者接收