1,啟動流程
Producer如何感覺要發送消息的broker(brokerAddrTable中的值是怎麼獲得的)?
- producer本地集合中沒有,會根據指定topic到namesrv擷取TopicPublishInfo,并放入本地集合。
- 定時從namesrv更新topic路由資訊。
Producer與broker間的心跳
Producer定時發送心跳,将producer資訊(其實就是procduer的group)定時發送到broker(brokerAddrTable集合中列出的)。
master接收消息,slave拷貝master
Producer發送消息,隻發送到broker_master機器,通過broker的主從複制機制拷貝到broker_slave。
2,如何發送消息
producer輪詢某topic下的所有queue的方式 實作發送方的負載均衡。
Topic下的所有隊列如何了解?
Broker | Topic | queue | 注冊namesrv隊列(Topic_A) |
---|---|---|---|
broker1 | Topic_A | queue0 , queue1 | broker1_queue0 ,broker1_queue1 |
broker2 | queue0, queue2, queue3 | broker2_queue0,broker2_queue1,broker2_queue2 | |
broker3 | queue0 | broker3_queue0 |
Producer如何實作輪詢隊列?
Producer從namesrv獲得topic_A路由資訊TopicPublishInfo。
//Topic_A的所有的隊列
private List<MessageQueue> messageQueueList
//自增整型
private volatile ThreadLocalIndex sendWhichQueue
/**
*選擇一個發送隊列
*lastBrokerName不為空,代表上次選擇的queue失敗,本次避開同一個queue
*/
public MessageQueue selectOneMessageQueue(final String lastBrokerName){
//計算隊列下标
//sendWhichQueue.getAndIncrement()每次調用+1
Math.abs(sendWhichQueue.getAndIncrement()) % this.messageQueueList.size()
}
Producer發消息系統重試?
//消息發送失敗重試次數預設為2
private int retryTimesWhenSendFailed = 2;
//發送消息逾時時間
private int sendMsgTimeout = 3000;
發送失敗,換個隊列繼續發送所需條件:
1. 重試次數不到retryTimesWhenSendFailed (預設2次)
2. 發送此條消息花費時間還沒有到sendMsgTimeout (預設3000毫秒)