天天看點

RocketMQ概念詳細之Consumer

消費者組和訂閱

首先你應該關心的是不同的消費者群組可以獨立的消費相同的主題,并且每個組都擁有自己的消費偏移量。

請確定相同消費者内的每個消費者訂閱一樣的主題。

消息監聽器

串行

消費者将鎖定每個消息隊列以確定它被順序消費。這會引起性能丢失,但當你關心消息順序的實時這是很有用的。不推薦抛異常,你可以用傳回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 替代。

并行

正如名字所說,消息者将會并行消費消息。為了好的性能推薦使用。不推薦抛異常,你可以使用傳回ConsumeConcurrentlyStatus.RECONSUME_LATER 代替。

消費狀态

對于MessageListenerConcurrently,你可以傳回 RECONSUME_LATER 來告訴消費者,你現在不能馬上消費并且想在稍後重新消費。然後你可以繼續消費其他消息。

對于MessageListenerOrderly,因為你關心順序,你不能跳過消息,但你可以傳回SUSPEND_CURRENT_QUEUE_A_MOMENT 來告訴消費者等待一會。

阻塞

不建議阻塞監聽器,因為它會阻塞線程池,最後使消費程序停止。

線程數量

消費者内部使用ThreadPoolExecutor來處理消費,是以你通過設定setConsumeThreadMin 或setConsumeThreadMax 可以改變它。

從哪開始消費

當建立一個新的消費者組,需要決定它是否需要消費在broker中已經存在的曆史消息。

CONSUME_FROM_LAST_OFFSET 将會忽略曆史消息,消費所有的之後産生的消息。

CONSUME_FROM_FIRST_OFFSET 将會消費Broker中存在的每一個消息。

CONSUME_FROM_TIMESTAMP ,消費指定時間戳之後産生的消息。

重複

很多情況下都會引起重複,例如:

  • Producer重發消息(FLUSH_SLAVE_TIMEOUT情況)
  • Consumer 關閉,一些偏移量未及時更新到Broker

如果你的應用不能容忍重複的話,你可能需要做一些額外的工作來處理。例如,你可以檢查DB的唯一主鍵。