天天看點

RocketMQ原了解析-consumer 5.push消費-順序消費消息

順序消費服務ConsumeMessageConcurrentlyService建構的時候

                   建構一個線程池來接收消費請求ConsumeRequest

                   建構一個單線程的本地線程,用來稍後定時重新消費ConsumeRequest, 用來執行定時周期性(一秒)鐘鎖隊列任務

         周期性鎖隊列lockMQPeriodically

                   擷取正在消費隊列清單ProcessQueueTable所有MesssageQueue, 建構根據broker歸類成MessageQueue集合Map<brokername,Set<MessageQueue>>

                   周遊Map<brokername,Set<MessageQueue>>的brokername, 擷取broker的master機器位址,将brokerName的Set<MessageQueue>發送到broker請求鎖定這些隊列。 在broker端鎖定隊列,其實就是在broker的queue中标記一下消費端,表示這個queue被某個client鎖定。 Broker會傳回成功鎖定隊列的集合, 根據成功鎖定的MessageQueue,設定對應的正在處理隊列ProccessQueue的locked屬性為true沒有鎖定設定為false

通過長輪詢拉取到消息後會送出到消息服務ConsumeMessageOrderlyService,

ConsumeMessageOrderlyService的submitConsumeRequest方法建構ConsumeRequest任務送出到線程池。ConsumeRequest是由ProcessQueue和Messagequeue組成。

ConsumeRequest任務的run方法

         判斷proccessQueue是否被droped的, 廢棄直接傳回,不在消費消息

         每個messagequeue都會生成一個隊列鎖來保證在目前consumer内,同一個隊列串行消費,

         判斷processQueue的lock屬性是否為true,lock屬性是否過期,如果為false或者過期,放到本地線程稍後鎖定在消費。 如果lock為true且沒有過期,開始消費消息

         計算任務執行的時間如果大于一分鐘且線程數小于隊列數情況下,将processqueue, messagequeue重新建構ConsumeRequest加到線程池10ms後在消費,這樣防止個别隊列被餓死

         擷取用戶端的消費批次個數,預設一批次為一條

         從proccessqueue擷取批次消息, processqueue.takeMessags(batchSize), 從msgTreeMap中移除消息放到臨時map中msgTreeMapTemp,這個臨時map用來復原消息和commit消息來實作事物消費

         調回調接口消費消息,傳回狀态對象ConsumeOrderlyStatus

         根據消費狀态,處理結果

1)   非事物方式,自動送出

消息消息狀态為success:調用processQueue.commit方法

                   擷取msgTreeMapTemp的最後一個key,表示送出的 offset

                   清空msgTreeMapTemp的消息,已經成功消費

2)   事物送出,由使用者來控制送出復原(精衛專用)

     更新消費進度, 這裡的更新隻是一個記憶體offsetTable的更新,後面有定時任務定時更新到broker上去

RocketMQ原了解析-consumer 5.push消費-順序消費消息