順序消費服務ConsumeMessageConcurrentlyService建構的時候
建構一個線程池來接收消費請求ConsumeRequest
建構一個單線程的本地線程,用來稍後定時重新消費ConsumeRequest, 用來執行定時周期性(一秒)鐘鎖隊列任務
周期性鎖隊列lockMQPeriodically
擷取正在消費隊列清單ProcessQueueTable所有MesssageQueue, 建構根據broker歸類成MessageQueue集合Map<brokername,Set<MessageQueue>>
周遊Map<brokername,Set<MessageQueue>>的brokername, 擷取broker的master機器位址,将brokerName的Set<MessageQueue>發送到broker請求鎖定這些隊列。 在broker端鎖定隊列,其實就是在broker的queue中标記一下消費端,表示這個queue被某個client鎖定。 Broker會傳回成功鎖定隊列的集合, 根據成功鎖定的MessageQueue,設定對應的正在處理隊列ProccessQueue的locked屬性為true沒有鎖定設定為false
通過長輪詢拉取到消息後會送出到消息服務ConsumeMessageOrderlyService,
ConsumeMessageOrderlyService的submitConsumeRequest方法建構ConsumeRequest任務送出到線程池。ConsumeRequest是由ProcessQueue和Messagequeue組成。
ConsumeRequest任務的run方法
判斷proccessQueue是否被droped的, 廢棄直接傳回,不在消費消息
每個messagequeue都會生成一個隊列鎖來保證在目前consumer内,同一個隊列串行消費,
判斷processQueue的lock屬性是否為true,lock屬性是否過期,如果為false或者過期,放到本地線程稍後鎖定在消費。 如果lock為true且沒有過期,開始消費消息
計算任務執行的時間如果大于一分鐘且線程數小于隊列數情況下,将processqueue, messagequeue重新建構ConsumeRequest加到線程池10ms後在消費,這樣防止個别隊列被餓死
擷取用戶端的消費批次個數,預設一批次為一條
從proccessqueue擷取批次消息, processqueue.takeMessags(batchSize), 從msgTreeMap中移除消息放到臨時map中msgTreeMapTemp,這個臨時map用來復原消息和commit消息來實作事物消費
調回調接口消費消息,傳回狀态對象ConsumeOrderlyStatus
根據消費狀态,處理結果
1) 非事物方式,自動送出
消息消息狀态為success:調用processQueue.commit方法
擷取msgTreeMapTemp的最後一個key,表示送出的 offset
清空msgTreeMapTemp的消息,已經成功消費
2) 事物送出,由使用者來控制送出復原(精衛專用)
更新消費進度, 這裡的更新隻是一個記憶體offsetTable的更新,後面有定時任務定時更新到broker上去
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0NXYFhGd192UvwVe0lmdhJ3ZvwFM38CXlZHbvN3cpR2Lc1TPB10QGtWUCpEMJ9CXsxWam9CXwADNvwVZ6l2c052bm9CXUJDT1wkNhVzLcRnbvZ2Lc1TVYFGe1cVWvB3MYBnVyQmb1IjYvZFWjZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39TO5UjN1cjMwATOwkDM0EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)