第一部分:延遲消息的實作原理和知識點
使用RabbitMQ來實作延遲任務必須先了解RabbitMQ的兩個概念:消息的TTL和死信Exchange,通過這兩者的組合來實作上述需求。
消息的TTL(Time To Live)
消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分别設定TTL。對隊列設定就是隊列沒有消費者連着的保留時間,也可以對每一個單獨的消息做單獨的設定。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設定了,消息也設定了,那麼會取小的。是以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設定)。這裡單講單個消息的TTL,因為它才是實作延遲任務的關鍵。
可以通過設定消息的expiration字段或者x-message-ttl屬性來設定時間,兩者是一樣的效果。隻是expiration字段是字元串參數,是以要寫個int類型的字元串:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsQTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5iM3gzM4YjZxQzYiRTY5YDNzYzXzIzM0gTMyEzLcBTMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
當上面的消息扔到隊列中後,過了3分鐘,如果沒有被消費,它就死了。不會被消費者消費到。這個消息後面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中并不會被删除和釋放,它會被統計到隊列的消息數中去。單靠死信還不能實作延遲任務,還要靠Dead Letter Exchange。
Dead Letter Exchanges
Exchage的概念在這裡就不在贅述。一個消息在滿足如下條件下,會進死信路由,記住這裡是路由而不是隊列,一個路由可以對應很多隊列。
- 一個消息被Consumer拒收了,并且reject方法的參數裡requeue是false。也就是說不會被再次放在隊列裡,被其他消費者使用。
- 上面的消息的TTL到了,消息過期了。
- 隊列的長度限制滿了。排在前面的消息會被丢棄或者扔到死信路由上。
Dead Letter Exchange其實就是一種普通的exchange,和建立其他exchange沒有兩樣。隻是在某一個設定Dead Letter Exchange的隊列中有消息過期了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。
實作延遲隊列
延遲任務通過消息的TTL和Dead Letter Exchange來實作。我們需要建立2個隊列,一個用于發送消息,一個用于消息過期後的轉發目标隊列。
生産者輸出消息到Queue1,并且這個消息是設定有有效時間的,比如3分鐘。消息會在Queue1中等待3分鐘,如果沒有消費者收掉的話,它就是被轉發到Queue2,Queue2有消費者,收到,處理延遲任務。
完成延遲任務的實作。
第二部分:具體實作例子
1、建立立消息隊列配置檔案rabbitmq.properties
#rabbitmq消息隊列的屬性配置檔案properties
rabbitmq.study.host=192.168.56.101
rabbitmq.study.username=duanml
rabbitmq.study.password=1qaz@WSX
rabbitmq.study.port=5672
rabbitmq.study.vhost=studymq
#Mail 消息隊列的相關變量值
mail.exchange=mailExchange
mail.exchange.key=mail_queue_key
#Phone 消息隊列的相關變量值
phone.topic.key=phone.one
phone.topic.key.more=phone.one.more
#delay 延遲消息隊列的相關變量值
delay.directQueue.key=TradePayNotify_delay_2m
delay.directMessage.key=TradePayNotify_delay_3m
2、建立立配置檔案,申明延遲隊列相關的配置資訊如:spring-rabbigmq-dlx.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--利用rabbitmq的TTL和延遲隊列,實作延遲通知任務的例子
1、申明了一個訂單通知服務的隊列 queue_Notify
2、申明了一個延遲隊列Notify_delay_15s,給整個隊列設定消息過期時間 為15秒 ——————》 queue ttl 例子
3、申明了一個延遲隊列Notify_delay_30s 給發送到這個隊列的消息,消息本身設定過期時間 ————————》 message ttl 例子
4、當消息發送到2、3隊列的時候,達到了過期時間,即轉發到訂單通知服務工作隊列 1、
5、給隊列1 配置消費者服務工作監聽,即可完成延遲任務的結果。
-->
<!-- ################ 訂單通知服務消費者配置 ################ -->
<!--隊列聲明-->
<rabbit:queue id="queue_Notify" name="queue_Notify" durable="true" auto-delete="false" exclusive="false"/>
<!-- 訂單通知服務消費者 exchange -->
<rabbit:direct-exchange name="trade_direct" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queue_Notify" key="TradePayNotify"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 訂單通知監聽處理器 -->
<bean id="notifyConsumerListener" class="org.seckill.rabbitmqListener.notify.NotifyConsumerListener"/>
<!--訂單消息隊列确認回調-->
<bean id="notifyConfirmCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyConfirmCallBackListener"></bean>
<!--訂單消息隊列消息發送失敗回調-->
<bean id="notifyFailedCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyFailedCallBackListener"></bean>
<!-- 監聽器acknowledge=manual表示手工确認消息已處理(異常時可以不确認消息),auto表示自動确認(隻要不抛出異常,消息就會被消費) -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener queues="queue_Notify" ref="notifyConsumerListener"/>
</rabbit:listener-container>
<!--*****************************************分割線*********************************************************-->
<!-- ################ 延遲隊列生産者配置 ################ -->
<rabbit:template id="rabbitTemplateDelay" mandatory="true" exchange="trade_direct_delay"
connection-factory="connectionFactory"
confirm-callback="notifyConfirmCallBackListener"
return-callback="notifyFailedCallBackListener"
message-converter="jsonMessageConverter"/>
<!--配置生産消息的延遲隊列操作主體類-->
<bean id="delayMQProducerImpl" class="org.seckill.utils.rabbitmq.Impl.MQProducerImpl">
<property name="rabbitTemplate" ref="rabbitTemplateDelay"></property>
</bean>
<!--申明一個延遲隊列,給整個隊列的消息設定消息過期時間 x-message-ttl 2分鐘
當消息達到過期時間的時候,rabbitmq将會把消息重新定位轉發到其它的隊列中去,本例子轉發到
exchange:trade_direct
routing-key:TradePayNotify
滿足如上兩點的隊列中去即為:queue_Notify
-->
<rabbit:queue id="Notify_delay_2m" name="Notify_delay_2m" durable="true" auto-delete="false"
exclusive="false">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="120000" value-type="java.lang.Long"/>
<entry key="x-dead-letter-exchange" value="trade_direct"/>
<entry key="x-dead-letter-routing-key" value="TradePayNotify"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--申明一個延遲隊列,在發送消息的時候給消息設定過期時間 3分鐘
當消息達到過期時間的時候,rabbitmq将會把消息重新定位轉發到其它的隊列中去,本例子轉發到
exchange:trade_direct
routing-key:TradePayNotify
滿足如上兩點的隊列中去即為:queue_Notify
-->
<rabbit:queue id="Notify_delay_3m" name="Notify_delay_3m" durable="true" auto-delete="false"
exclusive="false">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="trade_direct"/>
<entry key="x-dead-letter-routing-key" value="TradePayNotify"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 延遲隊列工作的 exchange -->
<rabbit:direct-exchange name="trade_direct_delay" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="Notify_delay_2m" key="TradePayNotify_delay_2m"/>
<rabbit:binding queue="Notify_delay_3m" key="TradePayNotify_delay_3m"/>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
3、建立立延遲隊列測試Controller
package org.seckill.web;
import org.seckill.dto.SeckillResult;
import org.seckill.entity.Seckill;
import org.seckill.utils.rabbitmq.Impl.MQProducerImpl;
import org.seckill.utils.rabbitmq.MQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.Date;
/**
* Description: 消息隊列測試
*/
@Controller
@RequestMapping("/rabbitmq")
public class RabbitmqController {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${delay.directQueue.key}")
private String delay_directQueue_key;
@Value("${delay.directMessage.key}")
private String delay_directMessage_key;
@Autowired
private MQProducerImpl delayMQProducerImpl;111
/**
* @Description: 消息隊列
* @Author:
* @CreateTime:
*/
@ResponseBody
@RequestMapping("/sendDelayQueue")
public SeckillResult<Long> testDelayQueue() {
SeckillResult<Long> result = null;
Date now = new Date();
try {
Seckill seckill = new Seckill();
//第一種情況,給隊列設定消息ttl,詳情見配置檔案
for (int i = 0; i < 2; i++) {
seckill.setSeckillId(1922339387 + i);
seckill.setName("delay_queue_ttl_" + i);
String msgId = delayMQProducerImpl.getMsgId();
Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
delayMQProducerImpl.sendDataToRabbitMQ(delay_directQueue_key, message);
}
//第二種情況,給消息設定ttl
for (int i = 0; i < 2; i++) {
seckill.setSeckillId(1922339287 + i);
seckill.setName("delay_message_ttl_" + i);
String msgId = delayMQProducerImpl.getMsgId();
Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
if (message != null) {
//給消息設定過期時間ttl,為3分鐘
message.getMessageProperties().setExpiration("180000");
delayMQProducerImpl.sendDataToRabbitMQ(delay_directMessage_key, message);
}
}
result = new SeckillResult<Long>(true, now.getTime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return result;
}
}
4、編寫延遲消息确認類和監聽類:
NotifyConfirmCallBackListener.java
package org.seckill.rabbitmqListener.notify;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
/**
* Description: 延遲任務測試--->消息确認回調類
*/
public class NotifyConfirmCallBackListener implements ConfirmCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* Confirmation callback.
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("延遲測試---确認消息完成-------->confirm--:correlationData:" + correlationData.getId() + ",ack:" + ack + ",cause:" + cause);
}
}
NotifyConsumerListener.java
package org.seckill.rabbitmqListener.notify;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
/**
* Description: 訂單通知隊列監聽服務
* 實作延遲任務的功能
*/
public class NotifyConsumerListener implements ChannelAwareMessageListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* Callback for processing a received Rabbit message.
* <p>Implementors are supposed to process the given Message,
* typically sending reply messages through the given Session.
*
* @param message the received AMQP message (never <code>null</code>)
* @param channel the underlying Rabbit Channel (never <code>null</code>)
* @throws Exception Any.
*/
public void onMessage(Message message, Channel channel) throws Exception {
try {
//将位元組流對象轉換成Java對象
// Seckill seckill=(Seckill) new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject();
String returnStr = new String(message.getBody(),"UTF-8");
JSONObject jsStr = JSONObject.parseObject(returnStr);
logger.info("延遲測試--消費開始:名稱為--===>" + jsStr.getString("name") + "----->傳回消息:" + returnStr + "||||消息的Properties:--》" + message.getMessageProperties());
//TODO 進行相關業務操作
//成功處理業務,那麼傳回消息确認機制,這個消息成功處理OK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
//消息已經進行過一次輪詢操作,還是失敗,将拒絕再次接收本消息
logger.info("消息已重複處理失敗,拒絕再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息
//TODO 進行相關業務操作
} else {
//消息第一次接收處理失敗後,将再此回到隊列中進行 再一次輪詢操作
logger.info("消息即将再次傳回隊列處理...");
//處理失敗,那麼傳回消息确認機制,這個消息沒有成功處理,傳回到隊列中
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
NotifyFailedCallBackListener.java
package org.seckill.rabbitmqListener.notify;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
/**
* Description: 延遲任務測試----> 消息發送失敗回調類
*/
public class NotifyFailedCallBackListener implements ReturnCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* Returned message callback.
*
* @param message the returned message.
* @param replyCode the reply code.
* @param replyText the reply text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("延遲測試------------->return--message:" +
new String(message.getBody()) +
",replyCode:" + replyCode + ",replyText:" + replyText +
",exchange:" + exchange + ",routingKey:" + routingKey);
}
}
package org.seckill.utils.rabbitmq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.support.CorrelationData;
/**
* Description: No Description
*/
public interface MQProducer {
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
* @param message
*/
void sendDataToRabbitMQ(java.lang.Object message);
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
* @param message
* @param messagePostProcessor
*/
void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor);
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
* @param message
* @param messagePostProcessor
* @param correlationData
*/
void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
* @param routingKey
* @param message
*/
void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message);
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
* @param routingKey
* @param message
* @param correlationData
*/
void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
* @param routingKey
* @param message
* @param messagePostProcessor
*/
void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
* @param routingKey
* @param message
* @param messagePostProcessor
* @param correlationData
*/
void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
/**
* Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
* @param exchange
* @param routingKey
* @param message
*/
void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message);
/**
* Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
* @param exchange
* @param routingKey
* @param message
* @param correlationData
*/
void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
/**
* Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
* @param exchange
* @param routingKey
* @param message
* @param messagePostProcessor
*/
void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
/**
* Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
* @param exchange
* @param routingKey
* @param message
* @param messagePostProcessor
* @param correlationData
*/
void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
Message messageBuil(Object handleObject, String msgId);
String getMsgId();
}
package org.seckill.utils.rabbitmq.Impl;
import com.alibaba.fastjson.JSONObject;
import org.seckill.utils.rabbitmq.MQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.UUID;
/**
* Description: 消息生産者操作主體類
*/
@Component
public class MQProducerImpl implements MQProducer{
private static final Logger logger = LoggerFactory.getLogger(MQProducerImpl.class);
private RabbitTemplate rabbitTemplate;
/**
* Sets the rabbitTemplate.
* <p>
* <p>You can use getRabbitTemplate() to get the value of rabbitTemplate</p>
*
* @param rabbitTemplate rabbitTemplate
*/
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
*
* @param message
*/
public void sendDataToRabbitMQ(Object message) {
try {
if (message instanceof Message){
Message messageSend = (Message) message;
String msgId = messageSend.getMessageProperties().getCorrelationId();
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,correlationData);
}else {
rabbitTemplate.convertAndSend(message);
}
} catch (AmqpException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
*
* @param message
* @param messagePostProcessor
*/
public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor) {
try {
if (message instanceof Message){
Message messageSend = (Message) message;
String msgId = messageSend.getMessageProperties().getCorrelationId();
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,messagePostProcessor,correlationData);
}else {
rabbitTemplate.convertAndSend(message, messagePostProcessor);
}
} catch (AmqpException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
*
* @param message
* @param messagePostProcessor
* @param correlationData
*/
public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
try {
rabbitTemplate.convertAndSend(message, messagePostProcessor, correlationData);
} catch (AmqpException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
*
* @param routingKey
* @param message
*/
public void sendDataToRabbitMQ(String routingKey, Object message) {
try {
if (message instanceof Message){
Message messageSend = (Message) message;
String msgId = messageSend.getMessageProperties().getCorrelationId();
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(routingKey,message,correlationData);
}else {
rabbitTemplate.convertAndSend(routingKey, message);
}
} catch (AmqpException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
*
* @param routingKey
* @param message
* @param correlationData
*/
public void sendDataToRabbitMQ(String routingKey, Object message, CorrelationData correlationData) {
try {
rabbitTemplate.convertAndSend(routingKey, message, correlationData);
} catch (AmqpException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
*
* @param routingKey
* @param message
* @param messagePostProcessor
*/
public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
try {
if (message instanceof Message){
Message messageSend = (Message) message;
String msgId = messageSend.getMessageProperties().getCorrelationId();
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(routingKey,message,messagePostProcessor,correlationData);
}else {
rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);
}
} catch (AmqpException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
*
* @param routingKey
* @param message
* @param messagePostProcessor
* @param correlationData
*/
public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
try {
rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor, correlationData);
} catch (AmqpException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
*
* @param exchange
* @param routingKey
* @param message
*/
public void sendDataToRabbitMQ(String exchange, String routingKey, Object message) {
try {
if (message instanceof Message){
Message messageSend = (Message) message;
String msgId = messageSend.getMessageProperties().getCorrelationId();
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(routingKey,message,correlationData);
}else {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
} catch (AmqpException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
*
* @param exchange
* @param routingKey
* @param message
* @param correlationData
*/
public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, CorrelationData correlationData) {
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
} catch (AmqpException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
*
* @param exchange
* @param routingKey
* @param message
* @param messagePostProcessor
*/
public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);
} catch (AmqpException e) {
logger.error(e.getMessage(), e);
}
}
/**
* Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
* 由于配置了JSON轉換,這裡是将Java對象轉換成JSON字元串的形式。
*
* @param exchange
* @param routingKey
* @param message
* @param messagePostProcessor
* @param correlationData
*/
public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
} catch (AmqpException e) {
logger.error(e.getMessage(), e);
}
}
/**
* 建構Message對象,進行消息發送
* @param handleObject
* @param msgId
* @return
*/
public Message messageBuil(Object handleObject, String msgId) {
try {
//先轉成JSON
String objectJSON = JSONObject.toJSONString(handleObject);
//再建構Message對象
Message messageBuil = MessageBuilder.withBody(objectJSON.getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setCorrelationId(msgId).build();
return messageBuil;
} catch (UnsupportedEncodingException e) {
logger.error("建構Message出錯:" + e.getMessage(),e);
return null;
}
}
/**
* 生成唯一的消息操作id
* @return
*/
public String getMsgId() {
return UUID.randomUUID().toString();
}
}