天天看点

用了这么久的RabbitMQ异步编程竟然都是错的!(下)

2.3 死信堵塞MQ之坑

始终无法处理的死信消息,可能会引发堵塞MQ。

若线程池的任务队列无上限,最终可能导致OOM,类似的MQ也要注意任务堆积问题。对于突发流量引起的MQ堆积,问题并不大,适当调整消费者的消费能力应该就可以解决。但在很多时候,消息队列的堆积堵塞,是因为有大量始终无法处理的消息。

2.3.1 案例

用户服务在用户注册后发出一条消息,会员服务监听到消息后给用户派发优惠券,但因用户并没有保存成功,会员服务处理消息始终失败,消息重新进入队列,然后还是处理失败。这种在MQ中回荡的同一条消息,就是死信。

随着MQ被越来越多的死信填满,消费者需花费大量时间反复处理死信,导致正常消息的消费受阻,最终MQ可能因数据量过大而崩溃。

定义一个队列、一个直接交换器,然后把队列绑定到交换器

用了这么久的RabbitMQ异步编程竟然都是错的!(下)

sendMessage发送消息到MQ,访问一次提交一条消息,使用自增标识作为消息内容

用了这么久的RabbitMQ异步编程竟然都是错的!(下)

收到消息后,直接NPE,模拟处理出错

用了这么久的RabbitMQ异步编程竟然都是错的!(下)

调用sendMessage接口发送两条消息,然后来到RabbitMQ管理台,可以看到这两条消息始终在队列,不断被重新投递,导致重新投递QPS达到1063。

用了这么久的RabbitMQ异步编程竟然都是错的!(下)

在日志中也可看到大量异常信息。

修复方案

  • 解决死信无限重复进入队列最简单方案

    在程序处理出错时,直接抛

    AmqpRejectAndDontRequeueException

    ,避免消息重新进入队列
throw new AmqpRejectAndDontRequeueException("error");      

但更希望对同一消息,能够先进行几次重试,解决因为网络问题导致的偶发消息处理失败,若依旧失败,再把消息投递到专门设置的DLX。对于来自DLX的数据,可能只是记录日志发送报警,即使出现异常也不会再重复投递。

  • 逻辑如下
  • 用了这么久的RabbitMQ异步编程竟然都是错的!(下)
  • 针对该问题,我们来看

Spring AMQP的简便解决方案

  1. 定义死信交换器、死信队列。其实都是普通交换器和队列,只不过专门用于处理死信消息
  2. 通过RetryInterceptorBuilder构建一个RetryOperationsInterceptor以处理失败时候的重试。策略是最多尝试5次(重试4次);并且采取指数退避重试,首次重试延迟1秒,第二次2秒,以此类推,最大延迟是10秒;如果第4次重试还是失败,则使用RepublishMessageRecoverer把消息重新投入一个DLX
  3. 定义死信队列的处理程序。本案例只记录日志

代码

用了这么久的RabbitMQ异步编程竟然都是错的!(下)

执行程序,发送两条消息,查看日志:

用了这么久的RabbitMQ异步编程竟然都是错的!(下)

msg2的4次重试间隔分别是1秒、2秒、4秒、8秒,再加上首次的失败,所以最大尝试次数是5

4次重试后,RepublishMessageRecoverer把消息发往DLX

死信处理程序输出了got dead message msg2。

虽然几乎同时发俩消息,但msg2在msg1四次重试全部结束后才开始处理,因为默认SimpleMessageListenerContainer只有一个消费线程。可通过增加消费线程避免性能问题:

直接设置concurrentConsumers参数为10,来增加到10个工作线程

用了这么久的RabbitMQ异步编程竟然都是错的!(下)
  • 也可设置

    maxConcurrentConsumers

    参数,让

    SimpleMessageListenerContainer

    动态调整消费者线程数。

小结

一般在遇到消息处理失败的时候,可设置重试。若重试还是不行,可把该消息扔到专门的死信队列处理,不要让死信影响到正常消息处理。