天天看點

RocketMQ-名詞和架構RocketMQ

RocketMQ

rocketMQ是做什麼的我就不用解釋了吧,以及他的背景。本文主要是為了讓大家明白RocketMQ的工作原理。

架構圖

RocketMQ-名詞和架構RocketMQ

上圖,雙箭頭代表是雙向通信,ProducerGroup和ConsumerGroup以及Broker叢集,NameServer叢集在互相通信的時候,是每個執行個體之間的通信。舉個例子:上圖中ProducerGroup和NameSevrer通信來說,是三台Producer執行個體分别與三台NameServer執行個體都會進行通信(當然前提是Producer預設注冊了三台Producer執行個體配置了三台NameServer的位址),但是三台NameServer之間不會進行通信,他們是多活的模式,不是主備的模式。

主要名詞解釋
  • ProducerGroup
    由一組Producer組成,如果隻是單純的發送普通消息,本身沒有什麼特别含義,發送分布式事務消息時,
    如果 Producer 中途意外當機,Broker會主動回調Producer Group内的任意一台機器來确認事務狀态。
               
  • ConsumerGroup
    辨別一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一緻。同
    Consumer Group下的各個執行個體将共同消費topic的消息,起到負載均衡的作用。
    消費進度以Consumer Group為粒度管理,不同Consumer Group之間消費進度彼此不受影響,
    即消息A被Consumer Group1消費過,也會再給Consumer Group2消費。
    注: RocketMQ要求同一個Consumer Group的消費者必須要擁有相同的注冊資訊,
    即必須要聽一樣的topic(并且tag也一樣)。
               
  • Client
    RocketMQ裡面有Client這個概念,Consumer和Producer都是Client,可以這麼了解:
    生産者和消費者都是用戶端,且都具備一個Client應該有的屬性,因為RocketMQ對Client有一些限制和規定,
    是以在使用Consumer和Producer的時候也要注意這些規定和限制。對應的有個ClientConfig類。
               
參數 預設值 參數說明
NameServer 通過配置這個Client可以與NameServer通信獲得需要的Topic和Broker的對應關系,預設值:-D系統參數rocketmq.namesrv.addr或環境變量NAMESRV_ADDR,Springboot配置:rocketmq.nameServer:IP:端口,多個NameServer用分号分割
clientIP 本機IP 用戶端所在的伺服器的ip位址,某些機器會發生無法識别用戶端IP位址情況,可以在代碼中強制指定
instanceName “DEFAULT” 如果是DEFAULT得話,該字段又會被轉換成該client所在的程序id,在RocketMQ中區分用戶端是根據ClientID,[email protected],也就是說如果同一個IP下的不同生産者如果instanceName相同的話,那就會被識别為同一個MQClientInstance(負責與MQ進行通信,如保持心跳,拉取Topick資訊等),如果兩個生産者配置的叢集不同(不同NameServer),那麼就會導緻不同生産者的消息發往同一個叢集(NameServer)中去,如果是消費者的話,就會導緻多個消費者消費相同Queue裡面的資訊,導緻資訊混亂
clientCallbackExecutorThreads 4 通信層回調線程數量
pollNameServerInteval 30000 輪訓NameServer的時間周期,機關毫秒
heartbeatBrokerInterval 30000 向Broker發送心跳的周期,機關毫秒
persistConsumerOffsetInterval 5000 持久化消費者消費進度的周期,機關毫秒,RocketMQ采取的是定期批量ack的機制以持久化消費進度。也就是說每次消費消息結束後,并不會立刻ack,而是定期的集中的更新進度。 由于持久化不是立刻持久化的,是以如果消費執行個體突然退出(如斷電)或者觸發了負載均衡分consue queue重排,有可能會有已經消費過的消費進度沒有及時更新而導緻重新投遞。故本配置值越小,重複的機率越低,但同時也會增加網絡通信的負擔。
vipChannelEnabled -D com.rocketmq.sendMessageWithVIPChannel參數的值,若無則是true 是否啟用VIP通道發送資訊,broker的netty server會起兩個通信服務。兩個服務除了服務的端口号不一樣,其他都一樣。其中一個的端口(配置端口-2)作為vip通道,用戶端可以啟用本設定項把發送消息此vip通道。
  • Producer
    Producer是一個Client,用來生産消息,并發送到指定Topic,甚至Broker和Queue。
               
參數 預設值 參數說明
producerGroup DEFAULT_PRODUCER 對于非事務型的Producer,producer group僅起到辨別作用并沒有實際作用
createTopicKey TBW102 Producer第一次發送消息的時候,如果topic不存在,若想自動建立該topic,需要一個topickey,這個值即是topickey的值。自動建立該topic支援的前提是broker 的配置打開autoCreateTopicEnable=true,然後broker會建立一個TBW102的topic,這個就是我們講的預設的topickey.自動建構topic的過程:Producer發送的時候如果發現該Topic不存在,就會向配置有Producer配置的topickey的那個broker發送消息broker校驗用戶端的topic key是否在broker存在,且校驗其權限最後一位是否是1(topic權限總共有3位,按位存儲,分别是讀、寫、支援自動建立)若權限校驗通過,先在該broker把T建立,并且權限就是topickey除去最後一位的權限
defaultTopicQueueNums 4 自動建立新的Topic時,建立的對應的Queue的數量
sendMsgTimeout 10000 發送消息的逾時時間 機關毫秒
compressMsgBodyOverHowmuch 4096 發送消息過大時,進行消息壓縮的标準 機關Byte
retryAnotherBrokerWhenNotStoreOK false 如果發送消息傳回sendResult,發送的結果如果不是SEND_OK狀态,是否當作失敗處理而嘗試重發,此配置項隻對同步發送有效,異步、oneway無效
maxMessageSize 4M 用戶端驗證,允許發送的最大消息體大小,超過會報錯
還有一些事務相關的屬性這裡就不羅列了 需要的可以自己去了解一下
  • PushConsumer
    推送模式的消費者,即消息由MQ主動推送過來
               
參數 預設值 參數說明
consumerGroup 消費者組名稱,用來辨別一組消費者
messageModel MessageModel.CLUSTERING 消費的模式:有兩種BROADCASTING和CLUSTERING。
consumeFromWhere ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET 啟動後的首次消費的起始點,可選值有三個:CONSUME_FROM_LAST_OFFSET //隊列尾消費 CONSUME_FROM_FIRST_OFFSET //隊列頭消費CONSUME_FROM_TIMESTAMP //按照日期選擇某個位置消費,這個配置隻生效于新線上測consumer group,如果是老的已存在的consumer group,都降按照已經持久化的consume offset進行消費
consumeTimestamp 半個小時前 配合上面的配置使用,CONSUME_FROM_LAST_OFFSET的時候使用,從哪個時間點開始消費,格式為yyyyMMddhhmmss 如 20191123171201
allocateMessageQueueStrategy AllocateMessageQueueAveragely(取模平均配置設定) 負載均衡政策算法這個算法可以自行擴充以使用自定義的算法,目前内置的有以下算法可以使用AllocateMessageQueueAveragely //取模平均AllocateMessageQueueAveragelyByCircle //環形平均AllocateMessageQueueByConfig // 按照配置,傳入聽死的messageQueueListAllocateMessageQueueByMachineRoom //按機房,從源碼上看,必須和阿裡的某些broker命名一緻才行,也可以自己實作相應的接口,實作自己的政策
consumeThreadMin 20 PushConsumer内部擁有一個線程池進行消費消息,這裡是核心線程數
consumeThreadMax 64 PushConsumer内部擁有一個線程池進行消費消息,這裡是最大線程數
consumeConcurrentlyMaxSpan 2000 并發消費下,單條consume queue隊列允許的最大offset跨度,達到則觸發流控,隻對并發消費(ConsumeMessageConcurrentlyService)生效
pullThresholdForQueue 1000 consume queue流控的門檻值,每條consume queue的消息拉取下來後會緩存到本地,消費結束會删除。當累積達到一個門檻值後,會觸發該consume queue的流控
pullInterval 拉取消息的時間間隔,機關毫秒。由于RocketMQ采取的pull的方式進行消息投遞,每此會發起一個異步pull請求,得到請求後會再發起下次請求,這個間隔預設是0,表示立刻再發起。在間隔為0的場景下,消息投遞的及時性幾乎等同用Push實作的機制
pullBatchSize 32 一次拉取的最大消息條數
consumeMessageBatchMaxSize 1 單次消費最大消息條數,由于拉取到的一批消息會立刻拆分成N(取決于consumeMessageBatchMaxSize)批消費任務,是以集合中msgs的最大大小是consumeMessageBatchMaxSize和pullBatchSize的較小值
maxReconsumeTimes -1 一個消息如果消費失敗的話,最多重新消費多少次才投遞到死信隊列,注:這個值預設值雖然是-1,但是實際使用的時候預設并不是-1。按照消費是并行還是串行消費有所不同的預設值。并行:預設16次串行:預設無限大(Interge.MAX_VALUE)。由于順序消費的特性必須等待前面的消息成功消費才能消費後面的,預設無限大即一直不斷消費直到消費完成。
suspendCurrentQueueTimeMillis 1000 串行消費使用,如果傳回ROLLBACK或者SUSPEND_CURRENT_QUEUE_A_MOMENT,再次消費的時間間隔,機關毫秒
consumeTimeout 15 消費的最長逾時時間 機關分鐘,如果消費逾時,則按照消費失敗
  • PullConsumer
    拉取模式的消費者,自己控制消息的消費,包括消費量和進度等
               
參數 預設值 參數說明
consumerGroup 消費者組名稱,用來辨別一組消費者
messageModel MessageModel.CLUSTERING 消費的模式:有兩種BROADCASTING和CLUSTERING。
registerTopics 空集合 消費者監聽的Topic
allocateMessageQueueStrategy AllocateMessageQueueAveragely(取模平均配置設定) 負載均衡政策算法這個算法可以自行擴充以使用自定義的算法,目前内置的有以下算法可以使用AllocateMessageQueueAveragely //取模平均AllocateMessageQueueAveragelyByCircle //環形平均AllocateMessageQueueByConfig // 按照配置,傳入聽死的messageQueueListAllocateMessageQueueByMachineRoom //按機房,從源碼上看,必須和阿裡的某些broker命名一緻才行,也可以自己實作相應的接口,實作自己的政策
offsetStore null 消息消費進度存儲器,offsetStore 有兩個政策:LocalFileOffsetStore 和 RemoteBrokerOffsetStore。若沒有顯示設定的情況下,廣播模式将使用LocalFileOffsetStore,叢集模式将使用RemoteBrokerOffsetStore
maxReconsumeTimes 16 調用sendMessageBack的時候,如果發現重新消費超過這個配置的值,則投遞到死信隊列。由于PullConsumer沒有管理消費的線程池和管理器,需要使用者自己處理各種消費結果和拉取結果,故需要投遞到重試隊列或死信隊列的時候需要顯示調用sendMessageBack。回傳消息的時候會帶上maxReconsumeTimes的值,broker發現此消息已經消費超過此值,則投遞到死信隊列,否則投遞到重試隊列。此邏輯和DefaultPushConsumer是一緻的,隻是PushConsumer無需使用者顯示調用。
messageQueueListener 由于pull操作需要使用者自己去觸發,故如果負載均衡發生變化,要有方法告知使用者現在分到的新consume queue是什麼。使用方可以實作接口MessageQueueListener 以達到此目的
  • Topic
    主題,是一個虛拟的概念,是一類消息的抽象,消息的具體存放是放在Broker上面的,
    一個Topic可以存放在多個Broker的多個Queue上面。
               
  • Broker
    節點,就是伺服器,發送的消息真正存放的地方,也是真正叢集部署的基本機關,
               
參數 預設值 參數說明
consumerGroup Conusmer組名,多個Consumer如果屬于一個應用,訂閱同樣的消息,且消費邏輯一緻,則應該将它們歸為同一組
listenPort 10911 Broker對外監聽的端口
namesrvAddr NameServer的位址
brokerIP1 本機ip位址 有些網卡會識别失敗或者識别錯誤,這裡就需要手動填寫
brokerName 本機主機名 可以設定主機名,和主從有關,主從的brokerName必須一緻
brokerClusterName DefaultCluster Broker所屬哪個叢集
brokerId BrokerId,必須是大等于0的整數,0表示Master,>0表示Slave,一個Master可以挂多個Slave,Master和Slave通過BrokerName來配對
storePathCommitLog $HOME/store/commitlog commitLog的存儲路徑,該broker接收到的所有的消息(任何topic)都會被執行個體化到該檔案裡面,為了保證寫入效率,這裡寫的方式是順序寫的
storePathConsumeQueue $HOME/store/consumequeue 消費隊列的存儲路徑
storePathIndex $HOME/store/index 消息索引存儲路徑
deleteWhen 4 删除時間點,24小時制
fileReservedTime 48 檔案保留時間,機關小時
maxTransferBytesOnMessageInMemory 262144 單次拉取消息(記憶體)傳輸的最大位元組數,機關Byte
maxTransferCountOnMessageInMemory 32 單次拉取消息(記憶體)的最大條數
maxTransferBytesOnMessageInDisk 65535 單次拉取消息(硬碟)傳輸的最大位元組數,機關Byte
maxTransferCountOnMessageInDisk 8 單次拉取消息(硬碟)的最大條數
messageIndexEnable True 是否開啟索引功能
brokerRole ASYNC_MASTER Broker的角色 ASYNC_MASTER:異步複制Master。SYNC_MASTER:同步雙寫Master。 SLAVE:從
flushDiskType ASYNC_FLUSH 刷盤方式。ASYNC_FLUSH:異步刷盤;SYNC_FLUSH:同步刷盤
cleanFileForciblyEnable TRUE 磁盤滿,且無過期檔案情況下TRUE表示強制删除檔案,優先保證服務可用,FALSE标記服務不可用,檔案不删除
  • Tag

    tag是消息的标志,發送消息的時候可以指定,接收消息的時候也可以按照這個Tag來過濾消息。

  • Queue

    是消息存在也是生産者投放消息和消費者消費資訊的目的地。

繼續閱讀