消費者組和訂閱
首先你應該關心的是不同的消費者群組可以獨立的消費相同的主題,并且每個組都擁有自己的消費偏移量。
請確定相同消費者内的每個消費者訂閱一樣的主題。
消息監聽器
串行
消費者将鎖定每個消息隊列以確定它被順序消費。這會引起性能丢失,但當你關心消息順序的實時這是很有用的。不推薦抛異常,你可以用傳回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 替代。
并行
正如名字所說,消息者将會并行消費消息。為了好的性能推薦使用。不推薦抛異常,你可以使用傳回ConsumeConcurrentlyStatus.RECONSUME_LATER 代替。
消費狀态
對于MessageListenerConcurrently,你可以傳回 RECONSUME_LATER 來告訴消費者,你現在不能馬上消費并且想在稍後重新消費。然後你可以繼續消費其他消息。
對于MessageListenerOrderly,因為你關心順序,你不能跳過消息,但你可以傳回SUSPEND_CURRENT_QUEUE_A_MOMENT 來告訴消費者等待一會。
阻塞
不建議阻塞監聽器,因為它會阻塞線程池,最後使消費程序停止。
線程數量
消費者内部使用ThreadPoolExecutor來處理消費,是以你通過設定setConsumeThreadMin 或setConsumeThreadMax 可以改變它。
從哪開始消費
當建立一個新的消費者組,需要決定它是否需要消費在broker中已經存在的曆史消息。
CONSUME_FROM_LAST_OFFSET 将會忽略曆史消息,消費所有的之後産生的消息。
CONSUME_FROM_FIRST_OFFSET 将會消費Broker中存在的每一個消息。
CONSUME_FROM_TIMESTAMP ,消費指定時間戳之後産生的消息。
重複
很多情況下都會引起重複,例如:
- Producer重發消息(FLUSH_SLAVE_TIMEOUT情況)
- Consumer 關閉,一些偏移量未及時更新到Broker
如果你的應用不能容忍重複的話,你可能需要做一些額外的工作來處理。例如,你可以檢查DB的唯一主鍵。