對消息服務需要了解的朋友,可以移步:
本章讨論主題
- 如何確定消息至少消費一次,確定消費者最大程度消費成功
消費者消費消息有2中方式:
-
push方式
消息服務接收到消息之後,主動将消息推送給消費者消費
-
pull方式
消費者定時從消息服務中拉取消息進行消費
下面我們将讨論2中方式中如何確定消息至少被消費一次。
push模式
消費的過程:
- 消息服務查詢待消費的消息清單
- 輪詢待消息清單
- 調用消費者
- 消費者收到消費請求,執行業務處理,将處理結果傳回給消息服務
- 消息服務接收到消費成功的資訊,将消息狀态置為消費成功狀态
- 繼續消費下一條消息
探讨一下上面需要考慮的問題:
若消息一直消費失敗如何處理?
先說一下影響:
-
消息被阻塞
消息如果一直消費失敗,消息服務會不斷調用消費者進行消費,會阻塞其他消息的消費,直接影響到業務的正常進行.
消費失敗的原因:
-
代碼問題
這種情況不管嘗試多少次,消息都會消費失敗,需要人工介入修複bug,這個可以依靠監控系統發現bug,同時開發進行修複。
- 系統運作異常如調用逾時、網絡問題等一些不可控的因素。産生這種錯誤,繼續重試,最終會處理成功。
此處咱們隻用讨論消息服務中重試機制如何設計?
系統異常情況下,可能過一段時間,系統恢複了,此時去重試,消費也就成功了。
是以我們對于消費失敗的消息采用延遲處理的方式,可以這麼實作:
消息中增加幾個字段用于重試:next_dispose_time【下次處理時間】、max_failure【最大允許失敗次數】、failure【目前失敗次數】,消息入庫時:next_dispose_time=需消費的時間,max_failure = 運作最大失敗次數, failure=0;
當消費失敗時,處理過程:
計算下次處理時間(next_dispose_time),可以在目前時間上面做指數遞增,比如根據失敗次數依次在目前時間上遞增2的failure次方秒,如:
第1次失敗:目前時間 + 2秒
第2次失敗:目前時間 + 4秒
第3次失敗:目前時間 + 8秒
第4次失敗:目前時間 + 16秒
.......
第n次失敗:目前時間 + 2的n次方秒
failure++
消息服務查詢待消費的消息也需要做調整:
select * from 消息表 where next_dispose_time<=目前時間 and failure此時能夠最大程度保證消息最少消費成功一次。
這種會複雜一些,為何會複雜一些,咱們先看一下正常的流程:
- 消費者從消息服務中拉取消息
- 本地進行處理
- 從消息服務中删除此消息
- 繼續拉取下一條進行處理
如果本地一直處理失敗,那麼後面拉取到的都是同一條消息,這條消息直接阻塞後續消息的消費,這種情況如何解?
咱們先分析一下出現這種問題的後果及原因:
- 後果:消息被阻塞,業務無法正常運作
- 原因:代碼問題或其他異常
- 確定代碼沒問題,可以解決上面問題,及時性不夠高,線上要考慮系統的容錯能力。
遇到這種問題還是挺嚴重了,業務方都是無法接受的,一條消息消費失敗,會影響到其他所有消息的消費,這個我們還是得想辦法解決,可以這樣:
- 消費者拉取消息
- 落地到本地
- 異步去消費本地落地的消息
消息先落地,然後異步處理,本地需要有個補償的job,去處理本地消費失敗的消息,這個可以參考push方式消費的過程。
對消息服務感興趣的,可以加群讨論
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLzYzMzYDZ0gzN4I2NjlDOihDO5gjNlNTY3UDOkFzNygjN3MjNwgjNj9CXt92Yu4GZjlGbh5SZslmZxl3Lc9CX6MHc0RHaiojIsJye.png)