我們經常使用消息隊列進行系統之間的解耦,日志記錄等等。但是有時候我們在使用 RabbitMQ時,由于exchange、bindKey、routingKey沒有設定正确,導緻我們發送給交換器(exchange)的消息,由于沒有正确的RoutingKey可能會存在一個消息丢失的情況,如果我們希望知道那些消息經過exchange之後,沒有被正确的存入消息隊列,那麼應該如何進行處理。
方案一:使用 mandatory 參數配合 ReturnListener 來進行解決
方案二:使用備份交換器 (alternate exchange) 來進行解決
方案一介紹:
mandatory參數的含義:
true:表示當交換器無法根據自身的類型和路由鍵找到一個符合條件的隊列時,那麼RabbitMQ會調用 Basic.Return 指令将消息傳回給生産者。生産者使用ReturnListener 來監聽沒有被正确路由到消息隊列中的消息。
false:表示當交換器無法根據自身的類型和路由鍵找到一個服務條件的隊列時,那麼RabbitMQ會丢棄這個消息。
注意事項:
1、有時候發現即使 mandatory參數設定成 true,也沒有進入 ReturnListener,那麼這個可能是什麼原因呢?其實這個可能是受RabbitMQ配置的記憶體和磁盤告警限制。(http://www.rabbitmq.com/alarms.html)
2、這是一個RabbitMQ配置的磁盤告警導緻沒有進入ReturnListener的例子。(http://rabbitmq.1065348.n5.nabble.com/ReturnListener-is-not-invoked-td24549.html)
示例代碼:
/**
* RabbitMQ 生産者
* <pre>
* 1、ReturnListener 的使用。
* >> mandatory: 參數需要設定成 true , ReturnListener 才會生效。
* >> 用于擷取到沒有路由到消息隊列中的消息。
* 2、ReturnListener 的注意事項 http://www.rabbitmq.com/alarms.html
* >> 受到記憶體和磁盤的限制
* >> http://rabbitmq.1065348.n5.nabble.com/ReturnListener-is-not-invoked-td24549.html(一個RabbitMQ disk_free_limit 參數導緻ReturnListener沒有進入的例子)
*
* </pre>
*
* @author huan.fu
* @date 2018/8/21 - 15:23
*/
public class RabbitProducer {
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "missing_routing_key";
private static final String BINDING_KEY = "bingkey_demo";
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "140.143.237.224";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(IP_ADDRESS);
connectionFactory.setPort(PORT);
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
try (
// 建立一個連接配接
Connection connection = connectionFactory.newConnection();
// 建立信道
Channel channel = connection.createChannel()
) {
// 建立一個 type="direct"持久化、非自動删除的交換器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
// 建立一個 持久化、非排他的、非自動删除的交換器
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 将交換器與隊列通過路由鍵綁定 使用 bindingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, BINDING_KEY);
// 發送一條持久化消息
String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 沒有被正确路由到消息隊列的消息.mandatory參數設定成true";
try {
// 使用 routingKey
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
System.err.println("消息發送完成......");
} catch (IOException e) {
e.printStackTrace();
}
/**
* 處理生産者沒有正确路由到消息隊列的消息
* 這個可能不會生效:受到 rabbitmq 配置的記憶體和磁盤的限制 {@link http://www.rabbitmq.com/alarms.html}
*/
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body, StandardCharsets.UTF_8));
});
}
}
}
方案二介紹:
使用方案一,我們需要自己寫ReturnListener,這樣業務代碼就變的複雜了,那麼有沒有一種簡單的方法呢?那就是使用 備份交換器(Alternate Exchange)
聲明交換器可以在channel.exchangeDeclare的時候 添加 alternate-exchange 參數來實作,交換器的類型建議聲明成 fanout 類型,因為消息被重新發送到備份交換器時的路由鍵和從生産者出發的路由鍵是一緻的。
示例代碼:
public class RabbitProducer {
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String BINDING_KEY = "bingkey_demo";
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "140.143.237.224";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(IP_ADDRESS);
connectionFactory.setPort(PORT);
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
try (
// 建立一個連接配接
Connection connection = connectionFactory.newConnection();
// 建立信道
Channel channel = connection.createChannel()
) {
Map<String, Object> arguments = new HashMap<>(16);
arguments.put("alternate-exchange", "backup-exchange");
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, arguments);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, BINDING_KEY);
// 聲明一個 fanout 類型的交換器,建議此處使用 fanout 類型的交換器
channel.exchangeDeclare("backup-exchange", "fanout", true, false, null);
// 消息沒有被路由的之後存入的隊列
channel.queueDeclare("unRoutingQueue", true, false, false, null);
channel.queueBind("unRoutingQueue", "backup-exchange", "");
// 發送一條持久化消息
String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 沒有被正确的路由到消息隊列,此時此消息會進入 unRoutingQueue";
try {
// 使用 routingKey
channel.basicPublish(EXCHANGE_NAME, "not-exists-routing-key", true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
System.err.println("消息發送完成......");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
上例圖解: