天天看點

RocketMQ原了解析-producer(轉)1,啟動流程2,如何發送消息3,如何發送順序消息4,發送分布式事物消息5,消息在broker落地之普通消息6,消息在broker落地之事物消息

1,啟動流程

RocketMQ原了解析-producer(轉)1,啟動流程2,如何發送消息3,如何發送順序消息4,發送分布式事物消息5,消息在broker落地之普通消息6,消息在broker落地之事物消息

Producer如何感覺要發送消息的broker(brokerAddrTable中的值是怎麼獲得的)?

  • producer本地集合中沒有,會根據指定topic到namesrv擷取TopicPublishInfo,并放入本地集合。
  • 定時從namesrv更新topic路由資訊。

Producer與broker間的心跳

Producer定時發送心跳,将producer資訊(其實就是procduer的group)定時發送到broker(brokerAddrTable集合中列出的)。

master接收消息,slave拷貝master

Producer發送消息,隻發送到broker_master機器,通過broker的主從複制機制拷貝到broker_slave。

2,如何發送消息

producer輪詢某topic下的所有queue的方式 實作發送方的負載均衡。

RocketMQ原了解析-producer(轉)1,啟動流程2,如何發送消息3,如何發送順序消息4,發送分布式事物消息5,消息在broker落地之普通消息6,消息在broker落地之事物消息

Topic下的所有隊列如何了解?

Broker Topic queue 注冊namesrv隊列(Topic_A)
broker1 Topic_A queue0 , queue1 broker1_queue0 ,broker1_queue1
broker2 queue0, queue2, queue3 broker2_queue0,broker2_queue1,broker2_queue2
broker3 queue0 broker3_queue0

Producer如何實作輪詢隊列?

Producer從namesrv獲得topic_A路由資訊TopicPublishInfo。

//Topic_A的所有的隊列
private List<MessageQueue> messageQueueList
//自增整型
private volatile ThreadLocalIndex sendWhichQueue
/**
  *選擇一個發送隊列
  *lastBrokerName不為空,代表上次選擇的queue失敗,本次避開同一個queue
  */
public MessageQueue selectOneMessageQueue(final String lastBrokerName){
  //計算隊列下标
  //sendWhichQueue.getAndIncrement()每次調用+1
  Math.abs(sendWhichQueue.getAndIncrement()) % this.messageQueueList.size()
}           

Producer發消息系統重試?

//消息發送失敗重試次數預設為2
private int retryTimesWhenSendFailed = 2;
//發送消息逾時時間
private int sendMsgTimeout = 3000;           

發送失敗,換個隊列繼續發送所需條件:

1. 重試次數不到retryTimesWhenSendFailed (預設2次)
2. 發送此條消息花費時間還沒有到sendMsgTimeout (預設3000毫秒)           

3,如何發送順序消息

4,發送分布式事物消息

5,消息在broker落地之普通消息

6,消息在broker落地之事物消息