本文我們來分析rocketMq producer 發送消息的流程.
producer發送消息的示例在org.apache.rocketmq.example.simple.Producer類中,代碼如下:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 1. 建立 DefaultMQProducer 對象
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
/*
* Launch the instance.
*/
// todo 2. 啟動 producer
producer.start();
for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 3. 發送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
...
}
producer.shutdown();
}
}
複制代碼
以上代碼分三步走:
- 建立 DefaultMQProducer 對象
- 啟動 producer
- 發送消息
接下來我們的分析也按這三步進行。
1. DefaultMQProducer構造方法
我們看下DefaultMQProducer 這個類有哪些功能:
- 功能一:給消息生産者配置參數,調整參數就是調用這個類的api,比如上面我們設定nameserv位址producer.setNamesrvAddr("127.0.0.1:9876");,可以把它看作一個配置類,
- 功能二:發送消息的功能,這裡它發送消息都是調用defaultMQProducerImpl 這個類,
- 功能三:它實作MQAdmin 接口裡面關于topic與MessageQueue的操作。
我們來看下它裡面有哪些重要的參數:
字段 | 預設值 | 解釋 |
producerGroup | null | 組資訊,需要你自己指定 |
defaultTopicQueueNums | 4 | 就是建立一個topic 預設設定4個MessageQueue |
sendMsgTimeout | 3000 | 發送消息的逾時時間 機關ms |
compressMsgBodyOverHowmuch | 1024 * 4 | 這個就是當發送的消息内容大于這個數的時候 進行壓縮,壓縮門檻值,預設是4k |
maxMessageSize | 1024 * 1024 * 4 | 允許發送消息最大大小 預設是4m |
retryTimesWhenSendFailed | 2 | 發送失敗的時候重試次數 預設是2次 |
retryTimesWhenSendAsyncFailed | 2 | 異步發送失敗的時候重試次數 預設是2次 |
retryAnotherBrokerWhenNotStoreOK | false | 訓示是否在内部發送失敗時重試另一個broker |
DefaultMQProducer這個類還繼承了一個ClientConfig ,這ClientConfig 不用看就知道是個用戶端配置類,我們看看它裡面重要字段的解釋:
字段 | 預設值 | 解釋 |
namesrvAddr | 預設去jvm啟動參數 rocketmq.namesrv.addr ,系統環境變量 NAMESRV_ADDR 中找 | nameserv的位址,這個最好是自己設定進去,你在DefaultMQProducer 類set 的就是指派給了它 |
clientIP | 自己去找 | 本用戶端位址,他自己就會去找的 |
instanceName | 預設是去jvm啟動參數rocketmq.client.name 中找,沒有設定DEFAULT,這個它會自己重新設定的 | 執行個體名稱 |
clientCallbackExecutorThreads | cpu核心數 | 執行callback 線程池的線程核心數 |
pollNameServerInterval | 1000 * 30 | 多久去nameserv 擷取topic 資訊,預設是30s |
heartbeatBrokerInterval | 1000 * 30 | 與broker心跳間隔時間,預設是30s,就是每隔30向broker發送心跳 |
DefaultMQProducer還實作一個MQProducer 接口,不用看,這個MQProducer 接口就是一堆send方法與start,shutdown方法,然後讓DefaultMQProducer去實作。DefaultMQProducer構造方法代碼如下:
public DefaultMQProducer(final String producerGroup) {
// 繼續調用
this(null, producerGroup, null);
}
/**
* 最終調用的構造方法
*/
public DefaultMQProducer(final String namespace,
final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
複制代碼
這個方法就是簡單地賦了值,然後建立了DefaultMQProducerImpl執行個體,我們繼續看DefaultMQProducerImpl的構造方法:
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
// 5w大小的連結清單隊列,異步發送線程池隊列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
// 預設的異步發送線程池
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),// 核心線程池數,cpu核數
Runtime.getRuntime().availableProcessors(), // 最大線程池數,cpu核數
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
複制代碼
這個構造方法依然還是處理指派操作,并沒做什麼實質性内容,就不繼續深究了。
2. DefaultMQProducer#start:啟動producer
接着我們來看看producer的啟動流程,進入DefaultMQProducer#start方法:
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
// 調用 defaultMQProducerImpl 的 start() 方法
this.defaultMQProducerImpl.start();
// 消息軌迹相關,我們不關注
if (null != traceDispatcher) {
...
}
}
複制代碼
這個方法先是調用了defaultMQProducerImpl#start方法,然後處理消息軌迹相關操作,關于rocketMq消息軌迹相關内容,本文就不過多探讨了,我們将目光聚集于DefaultMQProducerImpl#start(boolean)方法:
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
// 剛建立還沒有啟動
case CREATE_JUST:
// 設定為啟動失敗狀态
this.serviceState = ServiceState.START_FAILED;
// 檢查group配置
this.checkConfig();
// 隻要group組 不是CLIENT_INNER_PRODUCER, 就重新設定下執行個體名稱
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
// 改變生産者的instanceName 為程序id
this.defaultMQProducer.changeInstanceNameToPID();
}
// todo 建立MQClientInstance 執行個體(封裝了網絡處理API,消息生産者、消費者和Namesrv、broker打交道的網絡通道)
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// todo 進行注冊
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
// 沒有注冊成功,設定狀态為建立沒有啟動,然後抛出之前已經注冊的異常
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// topic --> topic info
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 是否啟動 client執行個體,預設是true
if (startFactory) {
// todo 核心
mQClientFactory.start();
}
// 啟動生産者成功
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 設定狀态running
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// todo 發送心跳到所有broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 定時掃描異步請求的傳回結果
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
複制代碼
這個方法并不複雜相關内容都已經作了注釋,這裡重點提出3個方法:
- mQClientFactory.start():執行方法為MQClientInstance#start,這個方法裡會啟動一些元件,我們稍後會分析。
- mQClientFactory.sendHeartbeatToAllBrokerWithLock():發送心跳到所有的broker,最終執行的方法為MQClientAPIImpl#sendHearbeat:
- public int sendHearbeat( final String addr, final HeartbeatData heartbeatData, final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { // request 的 code 為 HEART_BEAT RemotingCommand request = RemotingCommand .createRequestCommand(RequestCode.HEART_BEAT, null); request.setLanguage(clientConfig.getLanguage()); request.setBody(heartbeatData.encode()); // 異步調用 RemotingCommand response = this.remotingClient .invokeSync(addr, request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return response.getVersion(); } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } 複制代碼
- 這裡是與broker通信,request 的 code 為 HEART_BEAT,後面的分析中我們會看到,producer也會同nameServer通信。
- 定時掃描異步請求的傳回結果:最終執行的方法為RequestFutureTable.scanExpiredRequest(),關于該方法的内容,我們在分析producer發送異步消息時再分析。
2.1 MQClientInstance#start:啟動MQClientInstance
接下來我們來看看MQClientInstance的啟動,方法為MQClientInstance#start,代碼如下:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
// 先設定 失敗
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
// 判斷namesrv 是否為null
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// todo Start request-response channel
// 啟動遠端服務,這個方法隻是裝配了netty用戶端相關配置
// todo 注意:1. 這裡是netty用戶端,2. 這裡并沒有建立連接配接
this.mQClientAPIImpl.start();
// Start various schedule tasks
// todo 開啟任務排程
this.startScheduledTask();
// Start pull service
// todo 開啟 拉取服務
this.pullMessageService.start();
// Start rebalance service
// todo 開啟平衡服務
this.rebalanceService.start();
// Start push service
// todo 啟用内部的 producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
// 設定狀态為running
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
複制代碼
這個方法進行的操作在注釋中已經說明得很清楚了,接下來我們對以上的部分操作做進一步分析。
2.1.1 mQClientAPIImpl.start():配置netty用戶端
在看MQClientAPIImpl 的啟動方法之前,我們需要看下它的構造:
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
// 用戶端配置
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
// todo netty client
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
// 注冊rpc hook
this.remotingClient.registerRPCHook(rpcHook);
// 注冊 processor CHECK_TRANSACTION_STATE 檢查事務狀态
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
// 通知消費者id已更改
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
// 重置消費者用戶端偏移量
this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
// 從用戶端拉取消費者狀态
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
// 擷取消費者運作狀态
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
// 消費資訊
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
複制代碼
clientConfig這個是關于client配置的一些資訊,然後往後建立了一個NettyRemotingClient對象,這個就是netty client,封裝了一些同步異步的發送,接着clientRemotingProcessor,這個就是netty收到消息時候處理類,然後下面這一堆注冊processor這個就不看了,其實就是當收到這些code的消息時候讓哪個processor來處理。
先來看下clientConfig 有哪些配置字段需要我們關注:
字段 | 預設值 | 解釋 | 調優 |
clientWorkerThreads | 4 | 這個其實就是處理netty 那堆自定義handler的線程 | 是 |
clientCallbackExecutorThreads | cpu核心線程數 | callback線程數,這個用來執行你注冊的那些processor | 是 |
clientOnewaySemaphoreValue | 預設是65535 | 當發送單向消息的時候,信号量限流 | 是 |
clientAsyncSemaphoreValue | 65535 | 當發送異步消息的時候,信号量限流 | 是 |
connectTimeoutMillis | 3000 | 連接配接逾時時間 | 沒啥現實意義 |
clientSocketSndBufSize | 65535 | netty 用戶端 發送buffer | 是 |
clientSocketRcvBufSize | 65535 | netty 用戶端 接收buffer | 是 |
clientChannelMaxIdleTimeSeconds | 120s | 這個就是netty channel 最大空閑時間 | 也能調優 |
看下NettyRemotingClient 的構造方法:
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
// 設定 異步與單向請求的信号量
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
// netty配置檔案
this.nettyClientConfig = nettyClientConfig;
// listener
this.channelEventListener = channelEventListener;
// 這個是netty 用戶端回調線程數,預設是cpu核數
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
// 如果小于等于0 預設為4
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 設定線程池
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// selector 線程為1
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
// 是否啟用tls
if (nettyClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
log.info("SSL enabled for client");
} catch (IOException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
log.error("Failed to create SSLContext", e);
throw new RuntimeException("Failed to create SSLContext", e);
}
}
}
複制代碼
首先是調用父類的構造,建立2個信号量Semaphore,這兩個分别用來對單向發送,異步發送進行限流,預設是65535。之後就是建立public線程池,這個線程池主要是用來處理你注冊那堆processor,預設線程數是cpu核心數,再往後就是建立netty niogroup組就1個線程,這個主要是用來處理連接配接的,最後就是判斷是否使用ssl,使用的話建立ssl context。
再來看NettyRemotingClient#start方法,代碼如下:
@Override
public void start() {
// 預設 4個線程
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true) // 不使用tcp中的DELAY算法,就是有小包也要發送出去
.option(ChannelOption.SO_KEEPALIVE, false) // keeplive false
// 連結逾時 預設3s
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
//發送buffer 預設是65535
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
// 接收buffer 預設是65535
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 是否啟用tls
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
// 使用這個線程組處理下面這寫processor 預設是4個線程
defaultEventExecutorGroup,
new NettyEncoder(), // 編碼
new NettyDecoder(), // 解碼
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(), // 連接配接管理
new NettyClientHandler()); // todo netty client handler 處理接收到的消息
}
});
// todo 掃描消息擷取結果,每秒執行1次
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// todo 掃描響應表
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
複制代碼
對于這個方法,說明有兩個點:
- 方法裡使用的是Bootstrap而非ServerBootstrap,表示這是netty用戶端
- 整個方法中并沒有建立連接配接
2.1.2 startScheduledTask():啟動定時任務
啟動定時任務的方法為MQClientInstance#startScheduledTask,代碼如下:
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
// todo 擷取namesrv位址, 2分鐘執行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// todo 從namesrv上面更新topic的路由資訊,預設30s
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// todo 清除下線broker
MQClientInstance.this.cleanOfflineBroker();
// todo 發送心跳到所有broker上面
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// todo 持久化consumer offset 可以放在本地檔案,也可以推送到 broker
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// todo 調整線程池的線程數量,并沒有用上
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
複制代碼
這裡共有5個定時任務:
- 定時擷取 nameServer 的位址,MQClientInstance#start一開始會調用MQClientAPIImpl#fetchNameServerAddr擷取nameServer,這裡也調用了這個方法
- 定時更新topic的路由資訊,這裡會去nameServer擷取路由資訊,之後再分析
- 定時發送心跳資訊到nameServer,在DefaultMQProducerImpl#start(boolean)中,我們也提到了向nameServer發送心跳資訊,兩處調用的是同一個方法
- 持久化消費者的消費偏移量,這個僅對消費者consumer有效,後面分析消費者時再作分析
- 調整線程池的線程數量,不過追蹤到最後,發現這個并沒有生效,就不多說了
定時更新topic的路由資訊
這個就是它的定時任務,預設是30s執行一次的。我們看下updateTopicRouteInfoFromNameServer():
public void updateTopicRouteInfoFromNameServer() {
// 從消費者端與生産者端 擷取topic集合
Set<String> topicList = new HashSet<String>();
// Consumer
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}
// Producer
{
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
Set<String> lst = impl.getPublishTopicList();
topicList.addAll(lst);
}
}
}
// 周遊topic集合,進行更新路由資訊
for (String topic : topicList) {
// todo
this.updateTopicRouteInfoFromNameServer(topic);
}
}
複制代碼
它是将consumer 與producer 所有topic 放到一個topic集合中,然後周遊這個集合,一個一個topic請求更新。
接下來我們來看下這個updateTopicRouteInfoFromNameServer 方法,這方法有點長,我們一部分一部分的介紹,大體上分為2個部分吧,第一個部分是擷取對應topic的資訊,然後第二部分就是更新本地的topic table 緩存。
擷取部分代碼:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
// 預設 并且 defaultMQProducer不為空,當topic不存在的時候會進入這個if中
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
// todo 擷取topicRoute資訊
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
...
}
複制代碼
if是true這段是你沒有topic 然後需要建立topic 的時候幹的事,它會向nameserv擷取topic 為TBW102的值,然後擷取它對應的那個topicRouteData。
fasle的時候進入else裡面,這個就是拿着topic去nameserv那擷取對應的topicRouteData。 這裡需要解釋下這個TopicRouteData,可以了解為裡面存了兩部分内容,一是broker 位址資訊,二是messagequeue 對應哪個broker上面。稍微看下:
/**
* 包含兩部分内容,一是broker 位址資訊,二是messagequeue 對應哪個broker上面
*/
public class TopicRouteData extends RemotingSerializable {
// 順序消息配置内容,來自kvConfig
private String orderTopicConf;
// topic隊列中繼資料
private List<QueueData> queueDatas;
// topic分布的broker中繼資料
private List<BrokerData> brokerDatas;
// broker上過濾伺服器的位址清單
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
複制代碼
接着我們追蹤下從nameserv擷取topic資訊的代碼:
/**
* 從namesrv擷取topic路由資訊
* @param topic topic名稱
* @param timeoutMillis 逾時時間,預設3s
* @param allowTopicNotExist 允許這個topic不存在
*/
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
// 擷取路由資訊的請求頭
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
// 建立RemotingCommand 建立請求實體
// 發送請求的 code 為 GET_ROUTEINFO_BY_TOPIC
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
// todo 進行同步調用,擷取結果
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
// topic不存在
case ResponseCode.TOPIC_NOT_EXIST: {
// 允許topic不存在
if (allowTopicNotExist) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
break;
}
// 成功
case ResponseCode.SUCCESS: {
// 擷取内容
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
// 抛異常
throw new MQClientException(response.getCode(), response.getRemark());
}
複制代碼
我們可以看到,先是封裝了一個擷取路由的請求頭,然後将topic 設定到請求頭裡面,然後又将請求頭封裝成RemotingCommand,其實不管是requestHeader還是RemotingCommand,其實都是存儲資訊的一些實體,隻是代表的含義不一樣而已,你可以把RemotingCommand 看作是http協定的格式,想想http有請求行,請求頭,請求體,然後他這個requestHeader算是個請求頭,然後RemotingCommand裡面還有body屬性可以看作是請求體,需要注意的是,它設定了一個RequestCode是GET_ROUTEINTO_BY_TOPIC,其實nameserv就是根據這個請求code判斷出來你要幹什麼,隻有你把GET_ROUTEINTO_BY_TOPIC 告訴nameserv的時候,nameserv才能知道你要從我這裡擷取這個topic的路由資訊。
這裡發送向NameServer發送消息的code是GET_ROUTEINFO_BY_TOPIC,這點在前面分析nameServer的消息處理時也分析過了,并且還分析了當消息送達nameServer後,nameServer是如何傳回topic資料的,遺忘的小夥伴可以看下之前分析nameServer的文章。RocketMQ源碼3-NameServer 消息處理 第3節
接着就是調用remotingClient發送消息了,它這裡用的是同步發送的方式(這裡暫時先不說了,其實就是使用netty client 發送消息, 後面有提到),也就是阻塞等着響應過來,逾時時間預設是3s,再往下看如果這個響應code是success的話,就把body弄出來然後反序列化話成 TopicRouteData對象。
其實這裡有個問題,就是我們有多個nameserv ,比如說我們有3台nameserv ,那麼生産者是找誰擷取的呢?其實它是這個樣子的,你上次用的那個nameserv要是還ok的話,也就是連接配接沒斷的話,它會繼續使用你上次用的個,如果你是第一次發起這種請求,沒有與nameserv建立過連接配接或者是上次建立的那個連接配接不好使了,這個時候就會有個計數器,輪詢的使用 ,也就是計數器值+1%namserv位址個數的形式,如果不了解的同學可以找個輪詢算法看下,
其實都是使用計數器+1 % 清單的個數,這樣能夠選出來一個清單的位置來,再根據這個位置去清單中擷取一下具體的值,好了這個輪詢算法我們先解釋這麼多 ,如果能夠正常建立連接配接,直接使用這個連接配接了就,如果不能使用,也就是建立連接配接失敗,通路不通等等,這時候繼續循環使用這個輪詢算法擷取下一個位址,然後建立連接配接,如果能夠建立成功,傳回對應的channel就可以了,然後client可以往這個channel上發送請求了,這個channel的話可以看作兩端的通道,管道都可以。如果不成功繼續循環,他這裡循環次數是你配置nameserv位址的個數。
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
// 開始時間
long beginStartTime = System.currentTimeMillis();
// todo 輪詢擷取namesrv位址Channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
// 執行開始之前的rpchook
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
// 判斷逾時 之前有擷取連結的操作,可能會出現逾時的情況
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
// todo 進行同步執行,擷取響應
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
// 執行之後的rpchook
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
// 遠端發送請求異常
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
// 關閉channel
this.closeChannel(addr, channel);
throw e;
// 逾時異常
} catch (RemotingTimeoutException e) {
// 如果逾時 就關閉cahnnel話,就關閉channel 預設是不關閉的
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
複制代碼
好了,這裡我們就把擷取路由資訊的部分看完了,接着就是解析這個TopicRouteData 然後放到生産者本地緩存起來了。
更新本地路由緩存
這塊内容也是比較長,我們一段一段看下:
if (topicRouteData != null) {
// 之前的
TopicRouteData old = this.topicRouteTable.get(topic);
// 對比,看是否發生變化
boolean changed = topicRouteDataIsChange(old, topicRouteData);
// 沒有發生變化
if (!changed) {
// 調用isNeedUpdateTopicRouteInfo 再次判斷是否需要更新
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
複制代碼
他這裡先是從 topic table中擷取對應topic 的老資料,然後 拿老的 與新請求的進行對比,判斷一下有沒有變動,如果沒有變動的話,就調用isNeedUpdateTopicRouteInfo 方法再判斷一下需要更新,這個方法其實就是周遊所有的producer 或者是consumer,然後看看他們的topic table裡面是不是都有這個topic 沒有的話就需要更新下。
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
// 更新broker位址
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
// 更新推送消息/就是更新生産者topic資訊
{
// todo 将topic 轉成topic publish
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
// 調用所有的producer 更新對應的topic info
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
// 更新 topic publishinfo
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
// 更新訂閱資訊 更新消費者topic資訊
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
// 添加到route表中
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
複制代碼
這裡首先是更新了一下brokerAddrTable這個map ,這map裡面然後就是緩存着broker name-> broker位址的集合
接着就是将 topicRouteData轉成topicPublishInfo ,然後 haveTopicRouterInfo設定成true,就是說明它這個topicPublishInfo 裡面存着對應的topicRouteData資訊。
// 将topic 路由資訊轉成topic publish 資訊 提供給消息發送者發送消息使用
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}
info.setOrderTopic(true);
} else {
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
// 判斷有沒有master
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
// 建立對應的 messageQueue
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
info.setOrderTopic(false);
}
return info;
}
複制代碼
這裡就是将傳回的topicRouteData 轉成對應的topicPublishInfo,這個topicPublishInfo其實裡面就是MessageQueue,比如說我topicRouteData 裡面傳回2個broker ,然後每個broker的writeQueueNums 個數是4個,這個時候它生成的MessageQueue就是8個,然後每個broker對應着4個MessageQueue。
接着就是周遊更新各個producer的topicPublishInfoTable 對應topic資訊。 好了,到這我們的更新topic資訊的解析就結束了。
2.2 建立topic
有這麼一個場景:
我發送某個消息的時候指定的那個topic不存在(就是之前沒有建立過)消息生産者是怎樣處理的,預設的話如果topic不存在的話,消息生産者會先去nameserv拉下topic資訊(從nameserv擷取topic資訊流程),要是還不存在的話,
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
/**
* 第一次發送消息時,本地沒有緩存topic的路由資訊,查詢
* NameServer嘗試擷取路由資訊,如果路由資訊未找到,再次嘗試用默
* 認主題DefaultMQProducerImpl#createTopicKey去查詢
*/
/**
* 生産環境,不建議開啟自動建立主題
* 原因如:https://mp.weixin.qq.com/s/GbSlS3hi8IE0kznTynV4ZQ
*/
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 首先,使用topic 從NameServer嘗試擷取路由資訊
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// isDefault 為true 其次,使用預設的topic從NameServer嘗試擷取路由資訊
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
複制代碼
這裡就走else的代碼了,其實還是調用的updateTopicRouteInfoFromNameServer重載方法,這裡這個isDefault 是true了。這個時候就擷取一下預設topic的路由資訊,這個預設topic是TBW102,發送消息就選擇TBW102這個topic的broker發送,broker收到消息後會自動建立這個topic,這裡需要注意的是broker得支援自動建立,這裡是有個參數配置的autoCreateTopicEnable 設定成true就可以了。
topic我們一般是不會在producer中自動建立的,一般使用RocketMQ的可視化控制台,然後建立topic,指定對應的queue num,指定broker等等,類似下面這個東西
原文連結:https://juejin.cn/post/7209510210422980667