天天看點

RocketMQ 消息消費

引言

前面我們已經簡單地介紹了 RocketMQ 的整體設計思路,本文着重其中消息消費部分的實作細節,更多關于 RocketMQ 的文章均收錄于

<RocketMQ系列文章>

;

消息消費

消息消費以組的模式開展,一個消費組内可以包含多個消費者,每一個消費組可訂閱多個主題,消費組之間有叢集模式與廣播模式兩種消費模式。叢集模式,主題下的同一條消息隻允許被其中一個消費者消費。廣播模式,主題下的同一條消息将被叢集内的所有消費者消費一次。消息伺服器與消費者之間的消息傳送也有兩種方式:推模式、拉模式。所謂的拉模式,是消費端主動發起拉消息請求,而推模式是消息到達消息伺服器後,推送給消息消費者。RocketMQ 消息推模式的實作基于拉模式,在拉模式上包裝一層,一個拉取任務完成後開始下一個拉取任務。

消息隊列負載機制遵循一個通用的思想: 一個消息隊列同一時間隻允許被一個消費者消費,一個消費者可以消費多個消息隊列。

RocketMQ 支援局部順序消息消費,也就是保證同一個消息隊列上的消息順序消費。不支援消息全局順序消費,如果要實作某一主題的全局順序消息消費,可以将該主題的隊列數設定為 1,犧牲高可用性。

RocketMQ 支援兩種消息過濾模式:表達式(TAG、SQL92)與類過濾模式。

消費者啟動

  1. 建構主題訂閱資訊
    • 訂閱目标topic
    • 訂閱重試主題消息。RocketMQ消息重試是以消費組為機關,而不是主題,消息重試主題名為 %RETRY% + 消費組名。消費者在啟動的時候會自動訂閱該主題,參與該主題的消息隊列負載。
  2. 初始化消息進度。如果消息消費是叢集模式,那麼消息進度儲存在 Broker 上; 如果是廣播模式,那麼消息消費進度存儲在消費端。
  3. 根據是否是順序消費,建立消費端消費線程服務。ConsumeMessageService 主要負責消息消費,内部維護一個線程池。

消息拉取

我們會基于 PUSH 模型來介紹拉取機制,因為其内部包括了 PULL 模型。消息消費有兩種模式:廣播模式與叢集模式,廣播模式比較簡單,每一個消費者需要去拉取訂閱主題下所有消費隊列的消息,接下來主要基于叢集模式介紹。在叢集模式下,同一個消費組内有多個消息消費者,同一個主題存在多個消費隊列,消息隊列負載,通常的做法是一個消息隊列在同一時間隻允許被一個消息消費者消費,一個消息消費者可以同時消費多個消息隊列。

RocketMQ 使用一個單獨的線程 PullMessageService 來負責消息的拉取。

public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}           

PullMessageService 從服務端拉取到消息後,會根據消息對應的消費組,轉給該組對應的 ProcessQueue,而 ProcessQueue 是 MessageQueue 在消費端的重制、快照。 PullMessageService 從消息伺服器預設每次拉取 32 條消息,按消息的隊列偏移量順序存放在 ProcessQueue 中,PullMessageService 然後将消息送出到消費者消費線程池,消息成功消費後從 ProcessQueue 中移除。

消息拉取分為 3 個主要步驟。

  1. 消息拉取用戶端消息拉取請求封裝。
  2. 消息伺服器查找并傳回消息。
  3. 消息拉取用戶端處理傳回的消息。

發送拉取請求

  1. 判斷隊列狀态,如果不需要拉取則退出
  2. 進行消息拉取流控
    • 消息處理總數
    • 消息偏移量跨度
  3. 查詢路由表,找到要發送的目标 Broker 伺服器,如果沒找到就更新路由資訊
  4. 如果消息過濾模式為類過濾,則需要根據主題名稱、broker位址找到注冊在 Broker上的 FilterServer 位址,從 FilterServer 上拉取消息,否則從 Broker 上拉取消息
  5. 發送消息

Broker組裝消息

  1. 根據訂閱資訊,建構消息過濾器
    • tag 過濾器隻會過濾 tag 的 hashcode,為了追求高效率
    • SQL 過濾為了避免每次執行 SQL表達式,建構了 BloomFilter,在 Redis 防止緩存擊穿那裡我們也用過它
  2. 根據主題名稱與隊列編号擷取消息消費隊列
  3. 根據拉取消息偏移量,進行校對,如何偏移量不合法,則傳回相應的錯誤碼
  4. 如果待拉取偏移量大于 minOffset 并且小于 maxOffs 時,從目前 offset 處嘗試拉取 32 條消息,根據消息隊列偏移量(ConsumeQueue)從 CommitLog 檔案中查找消息
  5. 根據 PullResult 填充 responseHeader 的 nextBeginOffset、 minOffset、 maxOffset
  6. 如果主 Broker 工作繁忙,會設定 flag 建議消費者下次從 Slave 節點拉取消息
  7. 如果 CommitLog 标記可用并且目前節點為主節點,則更新消息消費進度

Bloom Filter是一種空間效率很高的随機資料結構,它的原理是,當一個元素被加入集合時,通過K個Hash函數将這個元素映射成一個位陣列(Bit array)中的K個點,把它們置為1。檢索時,我們隻要看看這些點是不是都是1就(大約)知道集合中有沒有它了:如果這些點有任何一個0,則被檢索元素一定不在;如果都是1,則被檢索元素很可能在。這就是布隆過濾器的基本思想。

但Bloom Filter的這種高效是有一定代價的:在判斷一個元素是否屬于某個集合時,有可能會把不屬于這個集合的元素誤認為屬于這個集合(false positive)。是以,Bloom Filter不适合那些“零錯誤”的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter通過極少的錯誤換取了存儲空間的極大節省。

用戶端處理消息

  1. 解碼成消息清單,并進行消息過濾
    • 這裡之是以還要進行過濾,是因為 Broker 為了追求效率隻會根據 tag 的 hashcode 進行過濾,真實 key string 的對比,下放到 Consumer 上進行
  2. 更新 PullRequest 的下一次拉取偏移量,如果過濾後沒有一條消息的話,則立即觸發下次拉取
  3. 首先将拉取到的消息存入 ProcessQueue,然後将拉取到的消息送出到 ConsumeMessageService 中供消費者消費,該方法是一個異步方法,也就是 PullCallBack 将消息送出到 ConsumeMessageService 中就會立即傳回
  4. 根據拉取延時,适時進行下一次拉取
RocketMQ 消息消費

RocketMQ 并沒有真正實作推模式,而是消費者主動向消息伺服器拉取消息,RocketMQ 推模式是循環向消息服務端發送消息拉取請求,如果消息消費者向 RocketMQ 發送消息拉取時,消息并未到達消費隊列,會根據配置産生不同效果:

  • 不啟用長輪詢機制:在服務端等待 shortPollingTimeMills=1s 時間後(挂起)再去判斷消息是否已到達消息隊列,如果消息未到達則提示消息拉取用戶端 PULL_NOT_FOUND (消息不存在)
  • 開啟長輪詢模式: RocketMQ 一方面會每 5s 輪詢檢查一次消息是否存在,同時一有新消息到達後立馬通知挂起線程再次驗證新消息是否是自己感興趣的消息,如果是, 則從 CommitLog 檔案提取消息傳回給消息拉取用戶端,否則等到挂起逾時,逾時時間由消息拉取方在消息拉取時封裝在請求參數中,PUSH 模式預設為 15s

當新消息達到 CommitLog 時,ReputMessageService 線程負責将消息轉發給 ConsumeQueue、IndexFile,如果 Broker 端開啟了長輪詢模式并且角色主節點,則最終将調用 PullRequestHoldService 線程的 notifyMessageArriving 方法喚醒挂起線程,判斷目前消費隊列最大偏移量是否大于待拉取偏移量,如果大于則拉取消息。長輪詢模式使得消息拉取能實作準實時。

隊列負載均衡

在 RocketMQ 中,Consumer 端的兩種消費模式(Push/Pull)都是基于拉模式來擷取消息的,而在 Push 模式隻是對 Pull 模式的一種封裝,其本質實作為消息拉取線程在從伺服器拉取到一批消息後,然後送出到消息消費線程池後,又“馬不停蹄”的繼續向伺服器再次嘗試拉取消息。如果未拉取到消息,則延遲一下又繼續拉取。在兩種基于拉模式的消費方式(Push/Pull)中,均需要 Consumer 端在知道從 Broker 端的哪一個消息隊列—隊列中去擷取消息。是以,有必要在 Consumer 端來做負載均衡,即 Broker 端中多個 MessageQueue 配置設定給同一個 ConsumerGroup 中的哪些 Consumer 消費。

在 Consumer 啟動後,它就會通過定時任務不斷地向 RocketMQ 叢集中的所有 Broker 執行個體發送心跳包(其中包含了,消息消費分組名稱、訂閱關系集合、消息通信模式和用戶端id的值等資訊)。Broker 端在收到 Consumer 的心跳消息後,會将它維護在 ConsumerManager 的本地緩存變量 ConsumerTable,同時并将封裝後的用戶端網絡通道資訊儲存在本地緩存變量 ChannelInfoTable 中,為之後做 Consumer 端的負載均衡提供可以依據的中繼資料資訊。

Consumer 的 RebalanceService 會每隔20s執行一次負載均衡。它會根據消費者通信類型為“廣播模式”還是“叢集模式”做不同的邏輯處理。因為廣播模式,每個 Consumer 都會訂閱所有隊列的内容,實作很簡單,是以這裡主要來看下叢集模式下的主要處理流程:

  1. 從本地緩存變量 TopicSubscribeInfoTable 中,擷取該 Topic 主題下的消息消費隊列集合
  2. 向各個 Broker 端發送擷取該消費組下消費者Id清單
  3. 先對 Topic 下的消息消費隊列、消費者 Id 排序,然後用消息隊列配置設定政策算法(預設為:消息隊列的平均配置設定算法),計算出待拉取的消息隊列
  4. 根據計算出來的新負載均衡結果,更新本地的隊列消費任務
    • 删除已經不由自己負責的隊列消費任務
    • 添加新的由自己負責的隊列消費任務,從 Broker 中讀取該隊列的消費偏移 Offset,然後開始消費任務

RocketMQ 的負載均衡過程并沒有通過選主配置設定的過程進行,而是各個節點自行計算,我覺得主要是為了實作友善,而且 RocketMQ 也不追求一個消息隻被消費一次,如果負載均衡的結果出現了短暫沖突(最終應該會趨于一緻),也可以靠 Consumer 實作幂等性解決。

消息消費過程

  1. 當 Consumer 拉取收新的消息時,會将這些消息以 32 個為一組,送出給消息消費者線程池
  2. 線程池進行實際消費時,會确認目前消息隊列是否仍然歸自己管轄(重新負載均衡時,将該隊列配置設定給了别的消費者)
  3. 恢複延時消息主題名
    • RocketMQ 将消息存入 CommitLog 檔案時,如果發現消息是延時消息,會首先将原主題存入在消息的屬性中,然後設定主題名稱為 SCHEDULE_TOPIC,以便時間到後重新參與消息消費。
  4. 執行具體的消息消費函數,最終将傳回 CONSUME_SUCCESS (消費成功)或 RECONSUME_LATER (需要重新消費)
  5. 如果業務代碼傳回 RECONSUME_LATER,根據模式作出不同的處理
    • 廣播模式:什麼都不處理,隻列印log
    • 叢集模式:回發消費失敗的消息,進行重新消費,如果發送失敗,則再次嘗試自己消費
  6. 根據消費成功的消息,計算消費者線程池的剩餘消息數量和大小,然後更新offset
    • 由于可能會出現一組消息隻有後半段被消費成功的情況,是以最終的 offset 為剩餘消息池中最小的 offset,這就勢必會出現重複消費

重試消息

RocketMQ 提供了幾個重試消息的延時級别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 1Om 20m 30m 1h 2h,同時也有消息最大重新消費次數,如果超過了最大重新消費次數則會被單獨存儲起來,等待人工處理。

重試消息會被存入名為"%RETRY%+消費組名稱"的主題中,原始主題會存入屬性中。然後會基于定時任務機制,在到期時将任務再次拉取出來。

消費進度管理

  • 廣播模式: 同一個消費組的所有消息消費者都需要消費主題下的所有消息,也就是同組内的消費者的消息消費行為是對立的,互相不影響,故消息進度需要獨立存儲,最理想的存儲地方應該是與消費者綁定。這些資料最終會存儲在 Consumer 節點的磁盤檔案中,采用周期性刷盤的形式存儲。
  • 叢集模式: 同一個消費組内的所有消息消費者共享消息主題下的所有消息,同一條消息(同一個消息消費隊列)在同一時間隻會被消費組内的一個消費者消費,并且随着消費隊列的動态變化重新負載,是以消費進度需要儲存在一個每個消費者都能通路到的地方————Broker,在需要更新 Offset 時,會以網絡請求的形式更新 Broker 中存儲的 Offset。
RocketMQ 消息消費

消費者線程池每處理完一個消息消費任務(ConsumeRequest)時會從 ProcessQueue 中移除本批消費的消息,并傳回 ProcessQueue 中最小的偏移量,用該偏移量更新消息隊列消費進度,如果 ProcessQueue 中的消息 Offset 分别為 [10,30,40,50],這時候消費了30,40,最後的 Offset 仍然為 10。隻有當 Offset = 10 的消息被消費後,Offset 才會變為 50。正因為如此,RocketMQ 才會有根據消息 Offset 跨度進行流量控制的功能。

此外,值得一提的是,當發生重新負載均衡後,如果某一隊列被配置設定給了其他消費者,那麼該隊列對應的 Offset 也會從本機中消除。

順序消息

RocketMQ 支援局部消息順序消費,可以確定同一個消息消費隊列中的消息被順序消費,如果需要做到全局順序消費則可以将主題配置成一個隊列。

消息隊列負載

如果經過消息隊列重新負載(配置設定)後,配置設定到新的消息隊列時,首先需要嘗試向 Broker 發起鎖定該消息隊列的請求,如果傳回加鎖成功則建立該消息隊列的拉取任務,否則将跳過,等待其他消費者釋放該消息隊列的鎖,然後在下一次隊列重新負載時再嘗試加鎖。如果重新配置設定後,發現某一隊列已不由自己負責,會主動的釋放該隊列的鎖。除此之外,鎖的最大存活時間是 60s,如果超過 60s 未續鎖,則自動釋放。

順序消息消費與并發消息消費的第一個關鍵差別: 順序消息在建立消息隊列拉取任務時需要在 Broker 伺服器鎖定該消息隊列。

消息拉取方式

消息拉取過程中,先會判斷該消息隊列是否被鎖定,如果未被自己鎖定,則會延遲一段時間後,再進行拉取任務。

消息消費方式

如果消費模式為叢集模式,啟動定時任務,預設每隔 20s 鎖定一次配置設定給自己的消息消費隊列(鎖的保活)。

在 ConsumeMessageOrderlyService 消費消息時,先會擷取記憶體中的隊列鎖。也就是說,一個消息消費隊列同一時刻隻會被一個消費線程池中一個線程消費。除此之外,其他過程基本和并發消費的過程一緻。

文章說明

更多有價值的文章均收錄于

貝貝貓的文章目錄
RocketMQ 消息消費

版權聲明: 本部落格所有文章除特别聲明外,均采用 BY-NC-SA 許可協定。轉載請注明出處!

創作聲明: 本文基于下列所有參考内容進行創作,其中可能涉及複制、修改或者轉換,圖檔均來自網絡,如有侵權請聯系我,我會第一時間進行删除。

參考内容

[1]《RocketMQ技術内幕》

[2]《RocketMQ實戰與原了解析》

[3]

老生常談——利用消息隊列處理分布式事務

[4]

RocketMQ架構解析

[5]

MappedByteBuffer VS FileChannel 孰強孰弱?

[6]

檔案 IO 操作的一些最佳實踐

[7]

海量資料處理之Bloom Filter詳解

[8]

rocketmq GitHub Wiki