天天看點

RabbitMQ學習筆記 - 備份交換器

參考:<<RabbitMQ實戰指南>>

備份交換器(Alternate Exchange,簡稱AE),實際上和普通交換器沒有多大差別,同樣可以用來處理未被路由的消息。上一篇文章使用mandatory參數來解決,但是生産者代碼邏輯變得複雜,是以我們可以使用備份交換器将這些未被路由的消息存儲起來,之後有需要的時候再去處理。

實作方法:在聲明交換器的時候添加alternate-exchange參數。

1. 用戶端api

exchangeDeclare有多個重載方法,這些重載方法是由下面這個方法中預設的某些參數構成:

Exchange.DeclareOk exchangeDeclare(String exchange,
        BuiltinExchangeType type,
        boolean durable,
        boolean autoDelete,
        boolean internal,
        Map<String, Object> arguments) throws IOException;
           

方法傳回Exchange.DeclareOK,用來辨別成功聲明了一個交換器。各個參數詳細說明如下:

  • exchange:交換器名稱
  • type:交換器類型。BuiltinExchangeType枚舉類,有以下4中類型交換器:DIRECT(“direct”), FANOUT(“fanout”), TOPIC(“topic”), HEADERS(“headers”)
  • durable:設定是否持久化。true:持久化,false:非持久化。持久化可以将交換器存盤,在伺服器重新開機時不會丢失相關消息。
  • autoDelete:設定是否自動删除。true:自動删除,false:不自動删除。自動删除的前提是至少有一個隊列或交換器與這個交換器綁定,之後所有與這個交換器綁定的隊列或交換器都與此交換器解綁。
  • internal:設定是否内置的。true:内置交換器,false:非内置交換器。内置交換器,用戶端無法直接發送消息到這個交換器中,隻能通過交換器路由到交換器這種方式。
  • arguments:其他一些結構化參數。如備份交換器:alternate-exchange

2. 示例

2.1 原生api

聲明交換器時,通過alternate-exchange指定備份交換器。備份交換器建議設定成fanout類型,也可以設定成direct或topic的類型。

不過需要注意:消息被重新路由到備份交換器時的路由鍵和從生産者發出的路由鍵是一樣的。

// ...
// 聲明交換器
String exchangeName = "direct.exchange.test.ae-normal";
String queueName = "direct.queue.test.ae-normal";
String routingKey = "direct.routing-key.test.ae-normal";

String aeExchangeName = "fanout.exchange.test.ae";
String aeQueueName = "fanout.queue.test.ae";

// 聲明普通交換器,指定備份交換器,綁定隊列
Map<String, Object> args = new HashMap<>();
args.put("alternate-exchange", aeExchangeName);
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclare(exchangeName, "direct", true, false, args);
String queue = channel.queueDeclare(queueName, true, false, false, null).getQueue();
channel.queueBind(queue, exchangeName, routingKey);

// 聲明備份交換器、綁定隊列
AMQP.Exchange.DeclareOk fanoutDeclareOK = channel.exchangeDeclare(aeExchangeName, "fanout", true, false, null);
String aeQueue = channel.queueDeclare(aeQueueName, true, false, false, null).getQueue();
channel.queueBind(aeQueue, aeExchangeName, "");

// 正常路由,進入隊列
byte[] content = ("Test Msg " + System.currentTimeMillis()).getBytes("UTF-8");
channel.basicPublish(exchangeName, routingKey, false, null, content);

// 不可路由,進入備份交換器
byte[] content2 = ("Test Msg2 " + System.currentTimeMillis()).getBytes("UTF-8");
channel.basicPublish(exchangeName, routingKey + "2", false, null, content2);
           

2.2 springboot

(1)添加rabbitmq的starter

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
           

(2)application.yml

spring:
  rabbitmq:
    host: dev.tss.com
    port: 5672
    username: admin
    password: njittss

rabbitmq:
  test:
    ae:
      normalExchange: direct.exchange.test.ae-normal
      normalQueue: direct.queue.test.ae-normal
      normalRoutingKey: direct.routing-key.test.ae-normal
      aeExchange: fanout.exchange.test.ae
      aeQueue: fanout.queue.test.ae
           

(3)聲明交換器、隊列,設定備份交換器

和原生api一樣,在聲明普通交換器時,通過alternate-exchange設定備份交換器。

@Configuration
public class RabbitAlternateExchangeConfig {

    @Value("${rabbitmq.test.ae.normalExchange}")
    private String normalExchangeName;
    @Value("${rabbitmq.test.ae.normalQueue}")
    private String normalQueueName;
    @Value("${rabbitmq.test.ae.normalRoutingKey}")
    private String normalRoutingKey;

    @Value("${rabbitmq.test.ae.aeExchange}")
    private String aeExchangeName;
    @Value("${rabbitmq.test.ae.aeQueue}")
    private String aeQueueName;

    @Bean
    public DirectExchange aeNormalExchange() {
        // 配置備份交換器,交換器類型為Fanout
        Map<String, Object> args = new HashMap<>(4);
        args.put("alternate-exchange", aeExchangeName);
        return new DirectExchange(normalExchangeName, true, false, args);
    }
    @Bean
    public Queue aeNormalQueue() {
        return new Queue(normalQueueName, true);
    }
    @Bean
    Binding bindingAeNormalQueue(Queue aeNormalQueue, DirectExchange aeNormalExchange) {
        return BindingBuilder.bind(aeNormalQueue).to(aeNormalExchange).with(normalRoutingKey);
    }


    @Bean
    public FanoutExchange aeExchange() {
        return new FanoutExchange(aeExchangeName);
    }
    @Bean
    public Queue aeQueue() {
        return new Queue(aeQueueName, true);
    }
    @Bean
    Binding bindingAeQueue(Queue aeQueue, FanoutExchange aeExchange) {
        return BindingBuilder.bind(aeQueue).to(aeExchange);
    }
}
           

(4)測試發送可以正常路由、不可路由消息

@Value("${rabbitmq.test.ae.normalExchange}")
private String normalExchangeName;
@Value("${rabbitmq.test.ae.normalRoutingKey}")
private String normalRoutingKey;

@Autowired
private RabbitTemplate rabbitTemplate;
// 測試發送可以正常路由資訊
public boolean sendNormalMessage() {
    String message = "test normal message";
    this.rabbitTemplate.convertAndSend(normalExchangeName, normalRoutingKey, message);
    return true;
}
// 測試發送不可路由資訊
public boolean sendAbnormalMessage() {
    String message = "test abnormal message";
    this.rabbitTemplate.convertAndSend(normalExchangeName, normalRoutingKey + "2", message);
    return true;
}
           

觀察rabbitmq web控制台,可以正常路由的消息已經進入隊列(direct.queue.test.ae-normal),不可路由的消息已經轉發到備份交換器,路由到與之綁定的隊列上(fanout.queue.test.ae)

RabbitMQ學習筆記 - 備份交換器
RabbitMQ學習筆記 - 備份交換器

3.備注

(1)假如備份交換器的類型被設定為direct,與其綁定的隊列的路由鍵是key1,當某條路由鍵為key2的消息被轉發到這個備份交換器的時候,備份交換器沒有比對到合适的隊列,則消息丢失;如果消息的路由鍵是key1則可以存儲到隊列中。

建議将備份交換器設定為fanout類型。

(2)對于備份交換器,有以下幾種特殊情況:

  • 如果設定的備份交換器不存在,用戶端和RabbitMQ伺服器都不會有異常出現,此時消息會丢失。
  • 如果備份交換器沒有綁定任何隊列,用戶端和RabbitMQ服務端都不會有異常出現,此時消息會丢失。
  • 如果備份交換器沒有任何比對的隊列,用戶端和RabbitMQ服務端都不會有異常出現,此時消息會丢失。
  • 如果備份交換器和mandatory參數一起使用,那麼mandatory參數無效。

springboot-rabbitmq-demo測試代碼:https://github.com/mytt-10566/springboot-rabbitmq-demo

繼續閱讀