天天看點

rabbitMq延遲隊列實作

前言

我們要實作延遲消息隊列效果,在rabbtimq中可以通過TTL+死信的方式,把過期消息轉移到死信exchange中,然後再死信exchange綁定的隊列中去消費完成後期的業務邏輯。

rabbitMq延遲隊列實作

但是這裡有一個前提就是,我們TTL隊列中的過期時間都是一樣的,如果不一樣就會如下圖

rabbitMq延遲隊列實作

因為消息是排隊出隊的,如果前面的消息TTL時長大于後面的就會一直阻塞出隊口,造成隊列阻塞,而後面的消息即便是過期了也依然無法出隊釋放空間。

是以私信隻适合統一過期時長的消息依次入隊,如果是不同過期時長的消息并且不是過期時間短的排前面就會造成阻塞或者隊列爆滿,進而影響實際業務和隊列性能。

是以根據不同的過期時長需要建立不同的私信隊列,進而實作單一死信隻處理單一時長過期的消息。但是這樣也會造成死信隊列建立備援,業務累贅的問題。

參考:死信隊列

https://blog.csdn.net/zjcjava/article/details/79410137

rabbitMq延遲隊列

目前Kakfa,RockMq因為mq協定也沒有定義這種隊列标準,是以都不支援延遲隊列。RabbitMq為了解決這個情況,增加了一個延遲隊列插件abbitmq_delayed_message_exchange

rabbitMq延遲隊列實作

它的工作模式是消息發給延遲消息交換機,交換機Exchange根據消息頭中的TTL判斷如果消息過期了,它就把消息轉發給綁定相應的KEY隊列,然後該隊列的消費者去做業務邏輯處理。

這裡,延遲交換機是一個非阻塞結構的計數器,它會判斷每個消息的過期時長,達到過期時長的消除才會觸發轉發。沒有達到的就一直待在交換機中,這樣不用關注你的消息過期時長如何多變和前後不一緻,都可以通過它來完成。

預設情況下,它是沒有安裝這個插件的,需要手工安裝插件。

安裝插件abbitmq_delayed_message_exchange

官網下載下傳

https://www.rabbitmq.com/community-plugins.html

下載下傳相應版本的檔案解壓後它是以ez格式結尾的檔案

rabbitmq_delayed_message_exchange-3.8.0.ez

在安裝的環境執行指令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
           

完成後,在管理界面可以看到建立exchange時多了一個Type是x-delayed-message類型

rabbitMq延遲隊列實作

如果在面闆中建立延遲消息交換機,如上圖,需要加上Arguments參數,而且是必須加,這樣,當延遲交換機才知道消息到期後是以哪種類型發給綁定的隊列。

rabbitMq延遲隊列實作

而,隊列則普通隊列綁定即可,這裡就不做示範了。建立完成我們在Exchange頁籤下面向該延遲消息交換機發送一條延遲消息,上圖我建立了一個5000毫秒的消息,路由參數Lazy.bill

點選發送,用戶端監聽該綁定的消息隊列,會發現 它不會立刻接受到消息,等一下才會接受到

代碼實作

/**
* rabbitmq配置
 * 實作RabbitListenerConfigurer 主要是為了消息類型轉換配置實作MappingJackson2MessageConverter 
 *  
 */

@Configuration
public class RabbitConfig implements RabbitListenerConfigurer {


    /**
     * 基礎配置
     * @param connectionFactory
     * @return
     */

    @Bean
    @Primary
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
//        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
//        factory.setMessageConverter(new Jackson2JsonMessageConverter());
//        factory.setErrorHandler(new RabbitListenerClassCastFailErrorHandler());
        return factory;
    }


    /**
     * 監聽器消費者消息類型轉換配置,如果字元串類型這裡可以不用配置
     * @return
     */
    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

    @Bean
    MessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
        messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
        return messageHandlerMethodFactory;
    }

    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }



    /***********延遲隊列消息配置*****************/
    public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
    public static final String LAZY_QUEUE = "Qu.LazyQueue";
    public static final String LAZY_KEY = "Lazy.#";
    public static final String LAZY_KEY_BILL = "Lazy.bill";



    @Bean
    public TopicExchange lazyExchange(){
        Map<String, Object> pros = new HashMap<>();
        //設定交換機支援延遲消息推送
        pros.put("x-delayed-type", "direct");
        TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false,pros);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Queue lazyQueue(){
        return new Queue(LAZY_QUEUE, true);
    }

    @Bean
    public Binding lazyBinding(){
        return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
    }



}
           

消息監聽消費端配置

@Slf4j
@Component
public class RabbitmqListener {
 

    /**
     * 延遲隊列消息處理
     * @param msg
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "Qu.LazyQueue")
    @RabbitHandler
    public void onLazyMessage(Message msg, Channel channel) throws IOException {
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, true);
        log.info("lazy receive ================================================");
        log.info("lazy receive ={}" + new String(msg.getBody()));
    }



}
           

延遲消息發送

@Resources
RabbitTemplate rabbitTemplate;

//confirmCallback returnCallback 代碼省略,請參照上一篇
    public void sendLazy(BillInfo  billInfo ){
        rabbitTemplate.setMandatory(true);
//        rabbitTemplate.setConfirmCallback(confirmCallback);
//        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 時間戳 全局唯一
        CorrelationData correlationData = new CorrelationData("12345678909"+new Date());

        String  msgJson = JSON.toJSONString(billInfo);
        //發送消息時指定 header 延遲時間
        rabbitTemplate.convertAndSend(RabbitConfig.LAZY_EXCHANGE, RabbitConfig.LAZY_KEY_BILL, msgJson,
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //設定消息持久化,過期時長6000,這裡可以根據業務類型自定義每個消息不同的過期時長,比如根據租賃合同結束時間配置不同的過期時長
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        message.getMessageProperties().setDelay(6000);
                        return message;
                    }
                }, correlationData);
    }