版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/u010046908/article/details/57079566
在前面寫過一篇
dubbo2.5-spring4-mybastis3.2-springmvc4-mongodb3.4-redis3.2整合(七)RabbitMQ工作原理和Spring的內建 ,今天在進一步使用一下RabbitMQ的延遲隊列的實作。1. 簡介
RabbitMQ如何實作延遲隊列:延遲隊列存儲的對象肯定是對應的延遲消息,所謂”延遲消息”是指當消息被發送以後,并不想讓消費者立即拿到消息,而是等待指定時間後,消費者才拿到這個消息進行消費。
2. RabbitMQ的延遲隊列使用場景
場景一:在訂單系統中,一個使用者下單之後通常有30分鐘的時間進行支付,如果30分鐘之内沒有支付成功,那麼這個訂單将進行一場處理。這是就可以使用延遲隊列将訂單資訊發送到延遲隊列。
場景二:使用者希望通過手機遠端遙控家裡的智能裝置在指定的時間進行工作。這時候就可以将使用者指令發送到延遲隊列,當指令設定的時間到了再将指令推送到隻能裝置。
3.RabbitMQ實作延遲隊列
AMQP協定,以及RabbitMQ本身沒有直接支援延遲隊列的功能,但是可以通過TTL和DLX模拟出延遲隊列的功能。
3.1 TTL(Time To Live)
RabbitMQ可以針對Queue和Message設定 x-message-tt,來控制消息的生存時間,如果逾時,則消息變為dead letter
RabbitMQ針對隊列中的消息過期時間有兩種方法可以設定。
- 通過隊列屬性設定,隊列中所有消息都有相同的過期時間。
- 對消息進行單獨設定,每條消息TTL可以不同。
如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為準。消息在隊列的生存時間一旦超過設定的TTL值,就成為dead letter
詳細可以參考:RabbitMQ之TTL(Time-To-Live 過期時間)
3.2 DLX (Dead-Letter-Exchange)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,如果隊列内出現了dead letter,則按照這兩個參數重新路由。
x-dead-letter-exchange:出現dead letter之後将dead letter重新發送到指定exchange x-dead-letter-routing-key:指定routing-key發送隊列出現dead letter的情況有:消息或者隊列的TTL過期 隊列達到最大長度 消息被消費端拒絕(basic.reject or basic.nack)并且requeue=false,利DLX,當消息在一個隊列中變成死信後,它能被重新publish到另一個Exchange。這時候消息就可以重新被消費。
4.案例的實作
4.1 rabbit.properties
rabbit_username=lidong1665
rabbit_password=123456
rabbit_host=192.168.0.107
rabbit_port=5672
4.2 spring-rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
<!--配置connection-factory,指定連接配接rabbit server參數 -->
<rabbit:connection-factory id="rabbitConnectionFactory"
username="${rabbit_username}"
password="${rabbit_password}"
host="${rabbit_host}"
port="${rabbit_port}"/>
<!--通過指定下面的admin資訊,目前producer中的exchange和queue會在rabbitmq伺服器上自動生成 -->
<rabbit:admin id="connectAdmin" connection-factory="rabbitConnectionFactory" />
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="3"/>
<property name="maxPoolSize" value="5"/>
<property name="queueCapacity" value="15"/>
</bean>
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<rabbit:topic-exchange name="delayChangeTest"
declared-by="connectAdmin" delayed="true">
<rabbit:bindings>
<rabbit:binding queue="delay_queue"
pattern="order.delay.notify"
/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定義queue 配置延遲隊列的資訊-->
<rabbit:queue name="delay_queue"
durable="true"
auto-declare="true"
auto-delete="false"
declared-by="connectAdmin">
</rabbit:queue>
<rabbit:template id="rabbitTemplate2" connection-factory="rabbitConnectionFactory"
exchange="delayChangeTest"/>
<bean id="orderConsumer" class="com.lidong.dubbo.core.util.customer.OrderConsumer"></bean>
<rabbit:listener-container
connection-factory="rabbitConnectionFactory"
acknowledge="manual"
channel-transacted="false"
message-converter="jsonMessageConverter">
<rabbit:listener queues="queueTest"
ref="messageReceiver" method="onMessage"/>
</rabbit:listener-container>
</beans>
4.3 建立生産者
package com.lidong.dubbo.core.spittle.service;
import com.lidong.dubbo.api.spittle.service.IMessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @項目名稱:lidong-dubbo
* @類名:MessageProducerImp
* @類的描述:
* @作者:lidong
* @建立時間:2017/2/4 上午10:01
* @公司:chni
* @QQ:1561281670
* @郵箱:[email protected]
*/
@Service
public class MessageProducerServiceImp implements IMessageProducer {
private Logger logger = LoggerFactory.getLogger(MessageProducerServiceImp.class);
@Resource
private RabbitTemplate rabbitTemplate2;
@Override
public void sendMessage(Object message) {
logger.info("to send message:{}",message);
final int xdelay= 300*1000;
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//發送延遲消息
rabbitTemplate2.convertAndSend("order.delay.notify", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)
throws AmqpException {
//設定消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//設定延遲時間(5分鐘後執行)
message.getMessageProperties().setDelay(xdelay);
logger.info("----"+sf.format(new Date()) + " Delay sent.");
return message;
}
});
}
}
4.4 建立消費者
package com.lidong.dubbo.core.util.customer;
import com.rabbitmq.client.Channel;
import org.activiti.engine.impl.util.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
/**
* @項目名稱:lidong-dubbo
* @類名:OrderConsumer
* @類的描述:
* @作者:lidong
* @建立時間:2017/2/25 下午12:59
* @公司:chni
* @QQ:1561281670
* @郵箱:[email protected]
*/
public class OrderConsumer implements ChannelAwareMessageListener {
private Logger logger = LoggerFactory.getLogger(OrderConsumer.class);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
logger.info("[延時消息]" + message.getMessageProperties());
if (message != null) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
logger.debug("deliveryTag= "+deliveryTag);
//手動确認
channel.basicAck(deliveryTag,false);
}
}
}
發送消息之後。消費5分鐘之後接受到消息,開始處理。
代碼位址