天天看點

Rabbitmq的延遲消息隊列實作

第一部分:延遲消息的實作原理和知識點

使用RabbitMQ來實作延遲任務必須先了解RabbitMQ的兩個概念:消息的TTL和死信Exchange,通過這兩者的組合來實作上述需求。

消息的TTL(Time To Live)

消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分别設定TTL。對隊列設定就是隊列沒有消費者連着的保留時間,也可以對每一個單獨的消息做單獨的設定。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設定了,消息也設定了,那麼會取小的。是以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設定)。這裡單講單個消息的TTL,因為它才是實作延遲任務的關鍵。

可以通過設定消息的expiration字段或者x-message-ttl屬性來設定時間,兩者是一樣的效果。隻是expiration字段是字元串參數,是以要寫個int類型的字元串:

Rabbitmq的延遲消息隊列實作

當上面的消息扔到隊列中後,過了3分鐘,如果沒有被消費,它就死了。不會被消費者消費到。這個消息後面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中并不會被删除和釋放,它會被統計到隊列的消息數中去。單靠死信還不能實作延遲任務,還要靠Dead Letter Exchange。

Dead Letter Exchanges

Exchage的概念在這裡就不在贅述。一個消息在滿足如下條件下,會進死信路由,記住這裡是路由而不是隊列,一個路由可以對應很多隊列。

  1. 一個消息被Consumer拒收了,并且reject方法的參數裡requeue是false。也就是說不會被再次放在隊列裡,被其他消費者使用。
  2. 上面的消息的TTL到了,消息過期了。
  3. 隊列的長度限制滿了。排在前面的消息會被丢棄或者扔到死信路由上。

Dead Letter Exchange其實就是一種普通的exchange,和建立其他exchange沒有兩樣。隻是在某一個設定Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。

實作延遲隊列

延遲任務通過消息的TTL和Dead Letter Exchange來實作。我們需要建立2個隊列,一個用于發送消息,一個用于消息過期後的轉發目标隊列。

Rabbitmq的延遲消息隊列實作

生産者輸出消息到Queue1,并且這個消息是設定有有效時間的,比如3分鐘。消息會在Queue1中等待3分鐘,如果沒有消費者收掉的話,它就是被轉發到Queue2,Queue2有消費者,收到,處理延遲任務。

完成延遲任務的實作。

第二部分:具體實作例子

1、建立立消息隊列配置檔案rabbitmq.properties

#rabbitmq消息隊列的屬性配置檔案properties
 rabbitmq.study.host=192.168.56.101
 rabbitmq.study.username=duanml
 rabbitmq.study.password=1qaz@WSX
 rabbitmq.study.port=5672
 rabbitmq.study.vhost=studymq
 
 #Mail 消息隊列的相關變量值
 mail.exchange=mailExchange
 mail.exchange.key=mail_queue_key
 
 
 #Phone 消息隊列的相關變量值
 phone.topic.key=phone.one
 phone.topic.key.more=phone.one.more
 
 #delay 延遲消息隊列的相關變量值
 delay.directQueue.key=TradePayNotify_delay_2m
 delay.directMessage.key=TradePayNotify_delay_3m
           

2、建立立配置檔案,申明延遲隊列相關的配置資訊如:spring-rabbigmq-dlx.xml

<?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
 
      <!--利用rabbitmq的TTL和延遲隊列,實作延遲通知任務的例子
         1、申明了一個訂單通知服務的隊列  queue_Notify
         2、申明了一個延遲隊列Notify_delay_15s,給整個隊列設定消息過期時間 為15秒  ——————》 queue ttl  例子
         3、申明了一個延遲隊列Notify_delay_30s  給發送到這個隊列的消息,消息本身設定過期時間 ————————》  message ttl  例子
         4、當消息發送到2、3隊列的時候,達到了過期時間,即轉發到訂單通知服務工作隊列 1、
         5、給隊列1 配置消費者服務工作監聽,即可完成延遲任務的結果。
     -->
 
     <!-- ################ 訂單通知服務消費者配置 ################ -->
     <!--隊列聲明-->
     <rabbit:queue id="queue_Notify" name="queue_Notify" durable="true" auto-delete="false" exclusive="false"/>
 
     <!-- 訂單通知服務消費者 exchange -->
     <rabbit:direct-exchange name="trade_direct" durable="true" auto-delete="false">
         <rabbit:bindings>
             <rabbit:binding queue="queue_Notify" key="TradePayNotify"/>
         </rabbit:bindings>
     </rabbit:direct-exchange>
 
     <!-- 訂單通知監聽處理器 -->
     <bean id="notifyConsumerListener" class="org.seckill.rabbitmqListener.notify.NotifyConsumerListener"/>
     <!--訂單消息隊列确認回調-->
     <bean id="notifyConfirmCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyConfirmCallBackListener"></bean>
     <!--訂單消息隊列消息發送失敗回調-->
     <bean id="notifyFailedCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyFailedCallBackListener"></bean>
 
     <!-- 監聽器acknowledge=manual表示手工确認消息已處理(異常時可以不确認消息),auto表示自動确認(隻要不抛出異常,消息就會被消費) -->
     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
         <rabbit:listener queues="queue_Notify" ref="notifyConsumerListener"/>
     </rabbit:listener-container>
 
     <!--*****************************************分割線*********************************************************-->
 
     <!-- ################ 延遲隊列生産者配置 ################ -->
     <rabbit:template id="rabbitTemplateDelay" mandatory="true" exchange="trade_direct_delay"
                      connection-factory="connectionFactory"
                      confirm-callback="notifyConfirmCallBackListener"
                      return-callback="notifyFailedCallBackListener"
                      message-converter="jsonMessageConverter"/>
 
     <!--配置生産消息的延遲隊列操作主體類-->
     <bean id="delayMQProducerImpl" class="org.seckill.utils.rabbitmq.Impl.MQProducerImpl">
         <property name="rabbitTemplate" ref="rabbitTemplateDelay"></property>
     </bean>
 
     <!--申明一個延遲隊列,給整個隊列的消息設定消息過期時間 x-message-ttl 2分鐘
         當消息達到過期時間的時候,rabbitmq将會把消息重新定位轉發到其它的隊列中去,本例子轉發到
         exchange:trade_direct
         routing-key:TradePayNotify
         滿足如上兩點的隊列中去即為:queue_Notify
     -->
     <rabbit:queue id="Notify_delay_2m" name="Notify_delay_2m" durable="true" auto-delete="false"
                   exclusive="false">
         <rabbit:queue-arguments>
             <entry key="x-message-ttl" value="120000" value-type="java.lang.Long"/>
             <entry key="x-dead-letter-exchange" value="trade_direct"/>
             <entry key="x-dead-letter-routing-key" value="TradePayNotify"/>
         </rabbit:queue-arguments>
     </rabbit:queue>
 
     <!--申明一個延遲隊列,在發送消息的時候給消息設定過期時間 3分鐘
            當消息達到過期時間的時候,rabbitmq将會把消息重新定位轉發到其它的隊列中去,本例子轉發到
            exchange:trade_direct
            routing-key:TradePayNotify
            滿足如上兩點的隊列中去即為:queue_Notify
     -->
     <rabbit:queue id="Notify_delay_3m" name="Notify_delay_3m" durable="true" auto-delete="false"
                   exclusive="false">
         <rabbit:queue-arguments>
             <entry key="x-dead-letter-exchange" value="trade_direct"/>
             <entry key="x-dead-letter-routing-key" value="TradePayNotify"/>
         </rabbit:queue-arguments>
     </rabbit:queue>
 
     <!-- 延遲隊列工作的 exchange -->
     <rabbit:direct-exchange name="trade_direct_delay" durable="true" auto-delete="false">
         <rabbit:bindings>
             <rabbit:binding queue="Notify_delay_2m" key="TradePayNotify_delay_2m"/>
             <rabbit:binding queue="Notify_delay_3m" key="TradePayNotify_delay_3m"/>
         </rabbit:bindings>
     </rabbit:direct-exchange> 
 </beans>
           

3、建立立延遲隊列測試Controller

package org.seckill.web;
 
 import org.seckill.dto.SeckillResult;
 import org.seckill.entity.Seckill;
 import org.seckill.utils.rabbitmq.Impl.MQProducerImpl;
 import org.seckill.utils.rabbitmq.MQProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.core.Message;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
 import java.util.Date;
 
 /**
  * Description: 消息隊列測試
  */
 @Controller
 @RequestMapping("/rabbitmq")
 public class RabbitmqController {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
  
     @Value("${delay.directQueue.key}")
     private String delay_directQueue_key;
 
     @Value("${delay.directMessage.key}")
     private String delay_directMessage_key;
 
     @Autowired
     private MQProducerImpl delayMQProducerImpl;111 
     /**
      * @Description: 消息隊列
      * @Author:
      * @CreateTime:
      */
     @ResponseBody
     @RequestMapping("/sendDelayQueue")
     public SeckillResult<Long> testDelayQueue() {
         SeckillResult<Long> result = null;
         Date now = new Date();
         try {
             Seckill seckill = new Seckill();
        //第一種情況,給隊列設定消息ttl,詳情見配置檔案
             for (int i = 0; i < 2; i++) {
                 seckill.setSeckillId(1922339387 + i);
                 seckill.setName("delay_queue_ttl_" + i);
                 String msgId = delayMQProducerImpl.getMsgId();
                 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
                 delayMQProducerImpl.sendDataToRabbitMQ(delay_directQueue_key, message);
             }
         //第二種情況,給消息設定ttl
             for (int i = 0; i < 2; i++) {
                 seckill.setSeckillId(1922339287 + i);
                 seckill.setName("delay_message_ttl_" + i);
                 String msgId = delayMQProducerImpl.getMsgId();
                 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
                 if (message != null) {
                     //給消息設定過期時間ttl,為3分鐘
                     message.getMessageProperties().setExpiration("180000");
                     delayMQProducerImpl.sendDataToRabbitMQ(delay_directMessage_key, message);
                 }
             }
             result = new SeckillResult<Long>(true, now.getTime());
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
         }
         return result;
     }
 
 }
           

4、編寫延遲消息确認類和監聽類:

NotifyConfirmCallBackListener.java

package org.seckill.rabbitmqListener.notify;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
 import org.springframework.amqp.rabbit.support.CorrelationData;
 
 /**
  * Description: 延遲任務測試--->消息确認回調類
  */
 public class NotifyConfirmCallBackListener implements ConfirmCallback {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     /**
      * Confirmation callback.
      *
      * @param correlationData correlation data for the callback.
      * @param ack             true for ack, false for nack
      * @param cause           An optional cause, for nack, when available, otherwise null.
      */
     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
         logger.info("延遲測試---确認消息完成-------->confirm--:correlationData:" + correlationData.getId() + ",ack:" + ack + ",cause:" + cause);
     }
 }
           

NotifyConsumerListener.java

package org.seckill.rabbitmqListener.notify;
 
 import com.alibaba.fastjson.JSONObject;
 import com.rabbitmq.client.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
 
 /**
  * Description: 訂單通知隊列監聽服務
  * 實作延遲任務的功能
  */
 public class NotifyConsumerListener implements ChannelAwareMessageListener {
 
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     /**
      * Callback for processing a received Rabbit message.
      * <p>Implementors are supposed to process the given Message,
      * typically sending reply messages through the given Session.
      *
      * @param message the received AMQP message (never <code>null</code>)
      * @param channel the underlying Rabbit Channel (never <code>null</code>)
      * @throws Exception Any.
      */
     public void onMessage(Message message, Channel channel) throws Exception {
         try {
             //将位元組流對象轉換成Java對象
 //            Seckill seckill=(Seckill) new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject();
 
             String returnStr = new String(message.getBody(),"UTF-8");
             JSONObject jsStr = JSONObject.parseObject(returnStr);
 
             logger.info("延遲測試--消費開始:名稱為--===>" + jsStr.getString("name") + "----->傳回消息:" + returnStr + "||||消息的Properties:--》" + message.getMessageProperties());
 
             //TODO 進行相關業務操作
 
             //成功處理業務,那麼傳回消息确認機制,這個消息成功處理OK
             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 
         } catch (Exception e) {
             if (message.getMessageProperties().getRedelivered()) {
                 //消息已經進行過一次輪詢操作,還是失敗,将拒絕再次接收本消息
                 logger.info("消息已重複處理失敗,拒絕再次接收...");
                 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息
 
                 //TODO 進行相關業務操作
 
             } else {
                 //消息第一次接收處理失敗後,将再此回到隊列中進行  再一次輪詢操作
                 logger.info("消息即将再次傳回隊列處理...");
                 //處理失敗,那麼傳回消息确認機制,這個消息沒有成功處理,傳回到隊列中
                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
             }
         }
     }
 }
           

NotifyFailedCallBackListener.java

package org.seckill.rabbitmqListener.notify;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;

 /**
  * Description: 延遲任務測試----> 消息發送失敗回調類
  */
 public class NotifyFailedCallBackListener implements ReturnCallback {

     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     /**
      * Returned message callback.
      *
      * @param message    the returned message.
      * @param replyCode  the reply code.
      * @param replyText  the reply text.
      * @param exchange   the exchange.
      * @param routingKey the routing key.
      */
     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
         logger.info("延遲測試------------->return--message:" +
                 new String(message.getBody()) +
                 ",replyCode:" + replyCode + ",replyText:" + replyText +
                 ",exchange:" + exchange + ",routingKey:" + routingKey);
     }
 }
           
package org.seckill.utils.rabbitmq;
 
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.core.MessagePostProcessor;
 import org.springframework.amqp.rabbit.support.CorrelationData; 

/**
 * Description: No Description
 */
 public interface MQProducer {
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      * @param message
      */
     void sendDataToRabbitMQ(java.lang.Object message);
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      * @param message
      * @param messagePostProcessor
      */
     void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor);
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      * @param message
      * @param messagePostProcessor
      * @param correlationData
      */
     void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      * @param routingKey
      * @param message
       */
     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message);
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      * @param routingKey
      * @param message
      * @param correlationData
      */
     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      * @param routingKey
      * @param message
      * @param messagePostProcessor
      */
     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      * @param routingKey
      * @param message
      * @param messagePostProcessor
      * @param correlationData
      */
     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
 
     /**
      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      * @param exchange
      * @param routingKey
      * @param message
      */
     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message);
 
     /**
      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      * @param exchange
      * @param routingKey
      * @param message
      * @param correlationData
      */
     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
 
     /**
      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      * @param exchange
      * @param routingKey
      * @param message
      * @param messagePostProcessor
      */
     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
 
     /**
      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      * @param exchange
      * @param routingKey
      * @param message
      * @param messagePostProcessor
      * @param correlationData
      */
     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
 
     Message messageBuil(Object handleObject, String msgId);
 
     String getMsgId();
 }
           
package org.seckill.utils.rabbitmq.Impl;
 
 import com.alibaba.fastjson.JSONObject;
 import org.seckill.utils.rabbitmq.MQProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.AmqpException;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.core.MessageBuilder;
 import org.springframework.amqp.core.MessagePostProcessor;
 import org.springframework.amqp.core.MessageProperties;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.amqp.rabbit.support.CorrelationData;
 import org.springframework.stereotype.Component;
 
 import java.io.UnsupportedEncodingException;
 import java.util.UUID;
 
 /**
  * Description: 消息生産者操作主體類
  */
 @Component
 public class MQProducerImpl implements MQProducer{
 
     private static final Logger logger = LoggerFactory.getLogger(MQProducerImpl.class);
 
     private RabbitTemplate rabbitTemplate;
 
     /**
      * Sets the rabbitTemplate.
      * <p>
      * <p>You can use getRabbitTemplate() to get the value of rabbitTemplate</p>
      *
      * @param rabbitTemplate rabbitTemplate
      */
     public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
         this.rabbitTemplate = rabbitTemplate;
     }
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      *
      * @param message
      */
     public void sendDataToRabbitMQ(Object message) {
         try {
             if (message instanceof Message){
                 Message messageSend = (Message) message;
                 String msgId = messageSend.getMessageProperties().getCorrelationId();
                 CorrelationData correlationData = new CorrelationData(msgId);
                 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,correlationData);
             }else {
                 rabbitTemplate.convertAndSend(message);
             }
         } catch (AmqpException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      *
      * @param message
      * @param messagePostProcessor
      */
     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor) {
         try {
             if (message instanceof Message){
                 Message messageSend = (Message) message;
                 String msgId = messageSend.getMessageProperties().getCorrelationId();
                 CorrelationData correlationData = new CorrelationData(msgId);
                 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,messagePostProcessor,correlationData);
             }else {
                 rabbitTemplate.convertAndSend(message, messagePostProcessor);
             }
         } catch (AmqpException e) {
             logger.error(e.getMessage(), e);
         }
     }

     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      *
      * @param message
      * @param messagePostProcessor
      * @param correlationData
      */
     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
         try {
             rabbitTemplate.convertAndSend(message, messagePostProcessor, correlationData);
         } catch (AmqpException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      *
      * @param routingKey
      * @param message
      */
     public void sendDataToRabbitMQ(String routingKey, Object message) {
         try {
             if (message instanceof Message){
                 Message messageSend = (Message) message;
                 String msgId = messageSend.getMessageProperties().getCorrelationId();
                 CorrelationData correlationData = new CorrelationData(msgId);
                 rabbitTemplate.convertAndSend(routingKey,message,correlationData);
             }else {
                 rabbitTemplate.convertAndSend(routingKey, message);
             }
         } catch (AmqpException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      *
      * @param routingKey
      * @param message
      * @param correlationData
      */
     public void sendDataToRabbitMQ(String routingKey, Object message, CorrelationData correlationData) {
         try {
             rabbitTemplate.convertAndSend(routingKey, message, correlationData);
         } catch (AmqpException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      *
      * @param routingKey
      * @param message
      * @param messagePostProcessor
      */
     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
         try {
             if (message instanceof Message){
                 Message messageSend = (Message) message;
                 String msgId = messageSend.getMessageProperties().getCorrelationId();
                 CorrelationData correlationData = new CorrelationData(msgId);
                 rabbitTemplate.convertAndSend(routingKey,message,messagePostProcessor,correlationData);
             }else {
                 rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);
             }
         } catch (AmqpException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      *
      * @param routingKey
      * @param message
      * @param messagePostProcessor
      * @param correlationData
      */
     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
         try {
             rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor, correlationData);
         } catch (AmqpException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      *
      * @param exchange
      * @param routingKey
      * @param message
      */
     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message) {
         try {
             if (message instanceof Message){
                 Message messageSend = (Message) message;
                 String msgId = messageSend.getMessageProperties().getCorrelationId();
                 CorrelationData correlationData = new CorrelationData(msgId);
                 rabbitTemplate.convertAndSend(routingKey,message,correlationData);
             }else {
                 rabbitTemplate.convertAndSend(exchange, routingKey, message);
             }
         } catch (AmqpException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      *
      * @param exchange
      * @param routingKey
      * @param message
      * @param correlationData
      */
     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, CorrelationData correlationData) {
         try {
             rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
         } catch (AmqpException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      *
      * @param exchange
      * @param routingKey
      * @param message
      * @param messagePostProcessor
      */
     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
         try {
             rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);
         } catch (AmqpException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
      * 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
      *
      * @param exchange
      * @param routingKey
      * @param message
      * @param messagePostProcessor
      * @param correlationData
      */
     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
         try {
             rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
         } catch (AmqpException e) {
             logger.error(e.getMessage(), e);
         }
     }
 
     /**
      * 建構Message對象,進行消息發送
      * @param handleObject
      * @param msgId
      * @return
      */
     public Message messageBuil(Object handleObject, String msgId) {
         try {
             //先轉成JSON
             String objectJSON = JSONObject.toJSONString(handleObject);
             //再建構Message對象
             Message messageBuil = MessageBuilder.withBody(objectJSON.getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                     .setCorrelationId(msgId).build();
             return messageBuil;
         } catch (UnsupportedEncodingException e) {
             logger.error("建構Message出錯:" + e.getMessage(),e);
             return null;
         }
     }
 
     /**
      * 生成唯一的消息操作id
      * @return
      */
     public String getMsgId() {
         return UUID.randomUUID().toString();
}
 
 }
           

參考:

rabbitmq的延遲消息隊列實作

RabbitMQ 延時消息隊列

使用spring-rabbit測試RabbitMQ消息确認(發送确認,接收确認)