天天看點

RocketMQ源碼——producer 啟動流程(擷取topic路由資訊)

本文我們來分析rocketMq producer 發送消息的流程.

producer發送消息的示例在org.apache.rocketmq.example.simple.Producer類中,代碼如下:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 1. 建立 DefaultMQProducer 對象
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

       
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Launch the instance.
         */
        // todo 2. 啟動 producer
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

              
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );


                // 3. 發送消息
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } 
            ...
        }
        producer.shutdown();
    }
}
複制代碼           

以上代碼分三步走:

  1. 建立 DefaultMQProducer 對象
  2. 啟動 producer
  3. 發送消息

接下來我們的分析也按這三步進行。

1. DefaultMQProducer構造方法

我們看下DefaultMQProducer 這個類有哪些功能:

  • 功能一:給消息生産者配置參數,調整參數就是調用這個類的api,比如上面我們設定nameserv位址producer.setNamesrvAddr("127.0.0.1:9876");,可以把它看作一個配置類,
  • 功能二:發送消息的功能,這裡它發送消息都是調用defaultMQProducerImpl 這個類,
  • 功能三:它實作MQAdmin 接口裡面關于topic與MessageQueue的操作。

我們來看下它裡面有哪些重要的參數:

字段 預設值 解釋
producerGroup null 組資訊,需要你自己指定
defaultTopicQueueNums 4 就是建立一個topic 預設設定4個MessageQueue
sendMsgTimeout 3000 發送消息的逾時時間 機關ms
compressMsgBodyOverHowmuch 1024 * 4 這個就是當發送的消息内容大于這個數的時候 進行壓縮,壓縮門檻值,預設是4k
maxMessageSize 1024 * 1024 * 4 允許發送消息最大大小 預設是4m
retryTimesWhenSendFailed 2 發送失敗的時候重試次數 預設是2次
retryTimesWhenSendAsyncFailed 2 異步發送失敗的時候重試次數 預設是2次
retryAnotherBrokerWhenNotStoreOK false 訓示是否在内部發送失敗時重試另一個broker

DefaultMQProducer這個類還繼承了一個ClientConfig ,這ClientConfig 不用看就知道是個用戶端配置類,我們看看它裡面重要字段的解釋:

字段 預設值 解釋
namesrvAddr 預設去jvm啟動參數 rocketmq.namesrv.addr ,系統環境變量 NAMESRV_ADDR 中找 nameserv的位址,這個最好是自己設定進去,你在DefaultMQProducer 類set 的就是指派給了它
clientIP 自己去找 本用戶端位址,他自己就會去找的
instanceName 預設是去jvm啟動參數rocketmq.client.name 中找,沒有設定DEFAULT,這個它會自己重新設定的 執行個體名稱
clientCallbackExecutorThreads cpu核心數 執行callback 線程池的線程核心數
pollNameServerInterval 1000 * 30 多久去nameserv 擷取topic 資訊,預設是30s
heartbeatBrokerInterval 1000 * 30 與broker心跳間隔時間,預設是30s,就是每隔30向broker發送心跳

DefaultMQProducer還實作一個MQProducer 接口,不用看,這個MQProducer 接口就是一堆send方法與start,shutdown方法,然後讓DefaultMQProducer去實作。DefaultMQProducer構造方法代碼如下:

public DefaultMQProducer(final String producerGroup) {
    // 繼續調用
    this(null, producerGroup, null);
}


/**
 * 最終調用的構造方法
 */
public DefaultMQProducer(final String namespace, 
        final String producerGroup, RPCHook rpcHook) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
複制代碼           

這個方法就是簡單地賦了值,然後建立了DefaultMQProducerImpl執行個體,我們繼續看DefaultMQProducerImpl的構造方法:

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;

    // 5w大小的連結清單隊列,異步發送線程池隊列
    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    // 預設的異步發送線程池
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),// 核心線程池數,cpu核數
        Runtime.getRuntime().availableProcessors(), // 最大線程池數,cpu核數
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.asyncSenderThreadPoolQueue,
        new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
}
複制代碼           

這個構造方法依然還是處理指派操作,并沒做什麼實質性内容,就不繼續深究了。

2. DefaultMQProducer#start:啟動producer

接着我們來看看producer的啟動流程,進入DefaultMQProducer#start方法:

public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    // 調用 defaultMQProducerImpl 的 start() 方法
    this.defaultMQProducerImpl.start();
    // 消息軌迹相關,我們不關注
    if (null != traceDispatcher) {
        ...
    }
}
複制代碼           

這個方法先是調用了defaultMQProducerImpl#start方法,然後處理消息軌迹相關操作,關于rocketMq消息軌迹相關内容,本文就不過多探讨了,我們将目光聚集于DefaultMQProducerImpl#start(boolean)方法:

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        // 剛建立還沒有啟動
        case CREATE_JUST:
            // 設定為啟動失敗狀态
            this.serviceState = ServiceState.START_FAILED;

            // 檢查group配置
            this.checkConfig();

            // 隻要group組 不是CLIENT_INNER_PRODUCER, 就重新設定下執行個體名稱
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                // 改變生産者的instanceName 為程序id
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            // todo 建立MQClientInstance 執行個體(封裝了網絡處理API,消息生産者、消費者和Namesrv、broker打交道的網絡通道)
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            // todo 進行注冊
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                // 沒有注冊成功,設定狀态為建立沒有啟動,然後抛出之前已經注冊的異常
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            // topic --> topic info
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            // 是否啟動 client執行個體,預設是true
            if (startFactory) {
                // todo 核心
                mQClientFactory.start();
            }

            // 啟動生産者成功
            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            // 設定狀态running
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    // todo 發送心跳到所有broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

    // 定時掃描異步請求的傳回結果
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}
複制代碼           

這個方法并不複雜相關内容都已經作了注釋,這裡重點提出3個方法:

  1. mQClientFactory.start():執行方法為MQClientInstance#start,這個方法裡會啟動一些元件,我們稍後會分析。
  2. mQClientFactory.sendHeartbeatToAllBrokerWithLock():發送心跳到所有的broker,最終執行的方法為MQClientAPIImpl#sendHearbeat:
  3. public int sendHearbeat( final String addr, final HeartbeatData heartbeatData, final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { // request 的 code 為 HEART_BEAT RemotingCommand request = RemotingCommand .createRequestCommand(RequestCode.HEART_BEAT, null); request.setLanguage(clientConfig.getLanguage()); request.setBody(heartbeatData.encode()); // 異步調用 RemotingCommand response = this.remotingClient .invokeSync(addr, request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return response.getVersion(); } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } 複制代碼
  4. 這裡是與broker通信,request 的 code 為 HEART_BEAT,後面的分析中我們會看到,producer也會同nameServer通信。
  5. 定時掃描異步請求的傳回結果:最終執行的方法為RequestFutureTable.scanExpiredRequest(),關于該方法的内容,我們在分析producer發送異步消息時再分析。

2.1 MQClientInstance#start:啟動MQClientInstance

接下來我們來看看MQClientInstance的啟動,方法為MQClientInstance#start,代碼如下:

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                // 先設定 失敗
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                // 判斷namesrv 是否為null
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // todo Start request-response channel
                // 啟動遠端服務,這個方法隻是裝配了netty用戶端相關配置
                // todo 注意:1. 這裡是netty用戶端,2. 這裡并沒有建立連接配接
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                // todo 開啟任務排程
                this.startScheduledTask();
                // Start pull service
                // todo 開啟 拉取服務
                this.pullMessageService.start();
                // Start rebalance service
                // todo 開啟平衡服務
                this.rebalanceService.start();
                // Start push service
                // todo 啟用内部的 producer
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                // 設定狀态為running
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}
複制代碼           

這個方法進行的操作在注釋中已經說明得很清楚了,接下來我們對以上的部分操作做進一步分析。

2.1.1 mQClientAPIImpl.start():配置netty用戶端

在看MQClientAPIImpl 的啟動方法之前,我們需要看下它的構造:

public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
    final ClientRemotingProcessor clientRemotingProcessor,
    RPCHook rpcHook, final ClientConfig clientConfig) {
    // 用戶端配置
    this.clientConfig = clientConfig;
    topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
    // todo netty client
    this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
    this.clientRemotingProcessor = clientRemotingProcessor;

    // 注冊rpc hook
    this.remotingClient.registerRPCHook(rpcHook);
    // 注冊 processor CHECK_TRANSACTION_STATE 檢查事務狀态
    this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);

    // 通知消費者id已更改
    this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);

    // 重置消費者用戶端偏移量
    this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);

    // 從用戶端拉取消費者狀态
    this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);

    // 擷取消費者運作狀态
    this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);

    // 消費資訊
    this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);

    this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
複制代碼           

clientConfig這個是關于client配置的一些資訊,然後往後建立了一個NettyRemotingClient對象,這個就是netty client,封裝了一些同步異步的發送,接着clientRemotingProcessor,這個就是netty收到消息時候處理類,然後下面這一堆注冊processor這個就不看了,其實就是當收到這些code的消息時候讓哪個processor來處理。

先來看下clientConfig 有哪些配置字段需要我們關注:

字段 預設值 解釋 調優
clientWorkerThreads 4 這個其實就是處理netty 那堆自定義handler的線程
clientCallbackExecutorThreads cpu核心線程數 callback線程數,這個用來執行你注冊的那些processor
clientOnewaySemaphoreValue 預設是65535 當發送單向消息的時候,信号量限流
clientAsyncSemaphoreValue 65535 當發送異步消息的時候,信号量限流
connectTimeoutMillis 3000 連接配接逾時時間 沒啥現實意義
clientSocketSndBufSize 65535 netty 用戶端 發送buffer
clientSocketRcvBufSize 65535 netty 用戶端 接收buffer
clientChannelMaxIdleTimeSeconds 120s 這個就是netty channel 最大空閑時間 也能調優

看下NettyRemotingClient 的構造方法:

public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
    final ChannelEventListener channelEventListener) {
    // 設定 異步與單向請求的信号量
    super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
    // netty配置檔案
    this.nettyClientConfig = nettyClientConfig;
    // listener
    this.channelEventListener = channelEventListener;

    // 這個是netty 用戶端回調線程數,預設是cpu核數
    int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
    // 如果小于等于0 預設為4
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }

    // 設定線程池
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    // selector 線程為1
    this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
        }
    });

    // 是否啟用tls
    if (nettyClientConfig.isUseTLS()) {
        try {
            sslContext = TlsHelper.buildSslContext(true);
            log.info("SSL enabled for client");
        } catch (IOException e) {
            log.error("Failed to create SSLContext", e);
        } catch (CertificateException e) {
            log.error("Failed to create SSLContext", e);
            throw new RuntimeException("Failed to create SSLContext", e);
        }
    }
}
複制代碼           

首先是調用父類的構造,建立2個信号量Semaphore,這兩個分别用來對單向發送,異步發送進行限流,預設是65535。之後就是建立public線程池,這個線程池主要是用來處理你注冊那堆processor,預設線程數是cpu核心數,再往後就是建立netty niogroup組就1個線程,這個主要是用來處理連接配接的,最後就是判斷是否使用ssl,使用的話建立ssl context。

再來看NettyRemotingClient#start方法,代碼如下:

@Override
public void start() {
    // 預設 4個線程
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true) // 不使用tcp中的DELAY算法,就是有小包也要發送出去
        .option(ChannelOption.SO_KEEPALIVE, false) // keeplive false
            // 連結逾時 預設3s
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            //發送buffer 預設是65535
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            // 接收buffer 預設是65535
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // 是否啟用tls
                if (nettyClientConfig.isUseTLS()) {
                    if (null != sslContext) {
                        pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                        log.info("Prepend SSL handler");
                    } else {
                        log.warn("Connections are insecure as SSLContext is null!");
                    }
                }
                pipeline.addLast(
                        // 使用這個線程組處理下面這寫processor 預設是4個線程
                    defaultEventExecutorGroup,
                    new NettyEncoder(), // 編碼
                    new NettyDecoder(), // 解碼
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                    new NettyConnectManageHandler(), // 連接配接管理
                    new NettyClientHandler());  // todo netty client handler 處理接收到的消息
            }
        });

    // todo 掃描消息擷取結果,每秒執行1次
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                // todo 掃描響應表
                NettyRemotingClient.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);

    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
}
複制代碼           

對于這個方法,說明有兩個點:

  1. 方法裡使用的是Bootstrap而非ServerBootstrap,表示這是netty用戶端
  2. 整個方法中并沒有建立連接配接

2.1.2 startScheduledTask():啟動定時任務

啟動定時任務的方法為MQClientInstance#startScheduledTask,代碼如下:

private void startScheduledTask() {
    if (null == this.clientConfig.getNamesrvAddr()) {
        // todo 擷取namesrv位址, 2分鐘執行一次
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 從namesrv上面更新topic的路由資訊,預設30s
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 清除下線broker
                MQClientInstance.this.cleanOfflineBroker();
                // todo 發送心跳到所有broker上面
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 持久化consumer offset 可以放在本地檔案,也可以推送到 broker
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 調整線程池的線程數量,并沒有用上
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}
複制代碼           

這裡共有5個定時任務:

  1. 定時擷取 nameServer 的位址,MQClientInstance#start一開始會調用MQClientAPIImpl#fetchNameServerAddr擷取nameServer,這裡也調用了這個方法
  2. 定時更新topic的路由資訊,這裡會去nameServer擷取路由資訊,之後再分析
  3. 定時發送心跳資訊到nameServer,在DefaultMQProducerImpl#start(boolean)中,我們也提到了向nameServer發送心跳資訊,兩處調用的是同一個方法
  4. 持久化消費者的消費偏移量,這個僅對消費者consumer有效,後面分析消費者時再作分析
  5. 調整線程池的線程數量,不過追蹤到最後,發現這個并沒有生效,就不多說了

定時更新topic的路由資訊

這個就是它的定時任務,預設是30s執行一次的。我們看下updateTopicRouteInfoFromNameServer():

public void updateTopicRouteInfoFromNameServer() {
    // 從消費者端與生産者端 擷取topic集合
    Set<String> topicList = new HashSet<String>();

    // Consumer
    {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                Set<SubscriptionData> subList = impl.subscriptions();
                if (subList != null) {
                    for (SubscriptionData subData : subList) {
                        topicList.add(subData.getTopic());
                    }
                }
            }
        }
    }

    // Producer
    {
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                Set<String> lst = impl.getPublishTopicList();
                topicList.addAll(lst);
            }
        }
    }

    // 周遊topic集合,進行更新路由資訊
    for (String topic : topicList) {
        // todo
        this.updateTopicRouteInfoFromNameServer(topic);
    }
}
複制代碼           

它是将consumer 與producer 所有topic 放到一個topic集合中,然後周遊這個集合,一個一個topic請求更新。

接下來我們來看下這個updateTopicRouteInfoFromNameServer 方法,這方法有點長,我們一部分一部分的介紹,大體上分為2個部分吧,第一個部分是擷取對應topic的資訊,然後第二部分就是更新本地的topic table 緩存。

擷取部分代碼:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                // 預設 并且 defaultMQProducer不為空,當topic不存在的時候會進入這個if中
                if (isDefault && defaultMQProducer != null) {
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                        1000 * 3);
                    if (topicRouteData != null) {
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    // todo 擷取topicRoute資訊
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
                ...
}
複制代碼           

if是true這段是你沒有topic 然後需要建立topic 的時候幹的事,它會向nameserv擷取topic 為TBW102的值,然後擷取它對應的那個topicRouteData。

fasle的時候進入else裡面,這個就是拿着topic去nameserv那擷取對應的topicRouteData。 這裡需要解釋下這個TopicRouteData,可以了解為裡面存了兩部分内容,一是broker 位址資訊,二是messagequeue 對應哪個broker上面。稍微看下:

/**
 * 包含兩部分内容,一是broker 位址資訊,二是messagequeue 對應哪個broker上面
 */
public class TopicRouteData extends RemotingSerializable {
    // 順序消息配置内容,來自kvConfig
    private String orderTopicConf;
    // topic隊列中繼資料
    private List<QueueData> queueDatas;
    // topic分布的broker中繼資料
    private List<BrokerData> brokerDatas;
    // broker上過濾伺服器的位址清單
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
複制代碼           

接着我們追蹤下從nameserv擷取topic資訊的代碼:

/**
 * 從namesrv擷取topic路由資訊
 * @param topic topic名稱
 * @param timeoutMillis 逾時時間,預設3s
 * @param allowTopicNotExist 允許這個topic不存在
 */
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
    boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    // 擷取路由資訊的請求頭
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);

    // 建立RemotingCommand 建立請求實體
    // 發送請求的 code 為 GET_ROUTEINFO_BY_TOPIC
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);

    // todo 進行同步調用,擷取結果
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        // topic不存在
        case ResponseCode.TOPIC_NOT_EXIST: {
            // 允許topic不存在
            if (allowTopicNotExist) {
                log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
            }

            break;
        }
        // 成功
        case ResponseCode.SUCCESS: {
            // 擷取内容
            byte[] body = response.getBody();
            if (body != null) {
                return TopicRouteData.decode(body, TopicRouteData.class);
            }
        }
        default:
            break;
    }

    // 抛異常
    throw new MQClientException(response.getCode(), response.getRemark());
}
複制代碼           

我們可以看到,先是封裝了一個擷取路由的請求頭,然後将topic 設定到請求頭裡面,然後又将請求頭封裝成RemotingCommand,其實不管是requestHeader還是RemotingCommand,其實都是存儲資訊的一些實體,隻是代表的含義不一樣而已,你可以把RemotingCommand 看作是http協定的格式,想想http有請求行,請求頭,請求體,然後他這個requestHeader算是個請求頭,然後RemotingCommand裡面還有body屬性可以看作是請求體,需要注意的是,它設定了一個RequestCode是GET_ROUTEINTO_BY_TOPIC,其實nameserv就是根據這個請求code判斷出來你要幹什麼,隻有你把GET_ROUTEINTO_BY_TOPIC 告訴nameserv的時候,nameserv才能知道你要從我這裡擷取這個topic的路由資訊。

這裡發送向NameServer發送消息的code是GET_ROUTEINFO_BY_TOPIC,這點在前面分析nameServer的消息處理時也分析過了,并且還分析了當消息送達nameServer後,nameServer是如何傳回topic資料的,遺忘的小夥伴可以看下之前分析nameServer的文章。RocketMQ源碼3-NameServer 消息處理 第3節

接着就是調用remotingClient發送消息了,它這裡用的是同步發送的方式(這裡暫時先不說了,其實就是使用netty client 發送消息, 後面有提到),也就是阻塞等着響應過來,逾時時間預設是3s,再往下看如果這個響應code是success的話,就把body弄出來然後反序列化話成 TopicRouteData對象。

其實這裡有個問題,就是我們有多個nameserv ,比如說我們有3台nameserv ,那麼生産者是找誰擷取的呢?其實它是這個樣子的,你上次用的那個nameserv要是還ok的話,也就是連接配接沒斷的話,它會繼續使用你上次用的個,如果你是第一次發起這種請求,沒有與nameserv建立過連接配接或者是上次建立的那個連接配接不好使了,這個時候就會有個計數器,輪詢的使用 ,也就是計數器值+1%namserv位址個數的形式,如果不了解的同學可以找個輪詢算法看下,

其實都是使用計數器+1 % 清單的個數,這樣能夠選出來一個清單的位置來,再根據這個位置去清單中擷取一下具體的值,好了這個輪詢算法我們先解釋這麼多 ,如果能夠正常建立連接配接,直接使用這個連接配接了就,如果不能使用,也就是建立連接配接失敗,通路不通等等,這時候繼續循環使用這個輪詢算法擷取下一個位址,然後建立連接配接,如果能夠建立成功,傳回對應的channel就可以了,然後client可以往這個channel上發送請求了,這個channel的話可以看作兩端的通道,管道都可以。如果不成功繼續循環,他這裡循環次數是你配置nameserv位址的個數。

@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    // 開始時間
    long beginStartTime = System.currentTimeMillis();
    // todo 輪詢擷取namesrv位址Channel
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            // 執行開始之前的rpchook
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            // 判斷逾時 之前有擷取連結的操作,可能會出現逾時的情況
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            // todo 進行同步執行,擷取響應
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            // 執行之後的rpchook
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
            // 遠端發送請求異常
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            // 關閉channel
            this.closeChannel(addr, channel);
            throw e;
            // 逾時異常
        } catch (RemotingTimeoutException e) {
            // 如果逾時 就關閉cahnnel話,就關閉channel 預設是不關閉的
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}
複制代碼           

好了,這裡我們就把擷取路由資訊的部分看完了,接着就是解析這個TopicRouteData 然後放到生産者本地緩存起來了。

更新本地路由緩存

這塊内容也是比較長,我們一段一段看下:

if (topicRouteData != null) {
    // 之前的
    TopicRouteData old = this.topicRouteTable.get(topic);
    // 對比,看是否發生變化
    boolean changed = topicRouteDataIsChange(old, topicRouteData);
    // 沒有發生變化
    if (!changed) {
        // 調用isNeedUpdateTopicRouteInfo 再次判斷是否需要更新
        changed = this.isNeedUpdateTopicRouteInfo(topic);
    } else {
        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
    }
複制代碼           

他這裡先是從 topic table中擷取對應topic 的老資料,然後 拿老的 與新請求的進行對比,判斷一下有沒有變動,如果沒有變動的話,就調用isNeedUpdateTopicRouteInfo 方法再判斷一下需要更新,這個方法其實就是周遊所有的producer 或者是consumer,然後看看他們的topic table裡面是不是都有這個topic 沒有的話就需要更新下。

if (changed) {
    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

    // 更新broker位址
    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    }

    // Update Pub info
    // 更新推送消息/就是更新生産者topic資訊
    {
        // todo 将topic 轉成topic publish
        TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
        publishInfo.setHaveTopicRouterInfo(true);
        // 調用所有的producer 更新對應的topic info
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                // 更新 topic publishinfo
                impl.updateTopicPublishInfo(topic, publishInfo);
            }
        }
    }

    // Update sub info
    // 更新訂閱資訊 更新消費者topic資訊
    {
        Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                impl.updateTopicSubscribeInfo(topic, subscribeInfo);
            }
        }
    }
    log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    // 添加到route表中
    this.topicRouteTable.put(topic, cloneTopicRouteData);
    return true;
}
複制代碼           

這裡首先是更新了一下brokerAddrTable這個map ,這map裡面然後就是緩存着broker name-> broker位址的集合

接着就是将 topicRouteData轉成topicPublishInfo ,然後 haveTopicRouterInfo設定成true,就是說明它這個topicPublishInfo 裡面存着對應的topicRouteData資訊。

// 将topic 路由資訊轉成topic publish 資訊 提供給消息發送者發送消息使用
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
    TopicPublishInfo info = new TopicPublishInfo();
    info.setTopicRouteData(route);
    if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
        String[] brokers = route.getOrderTopicConf().split(";");
        for (String broker : brokers) {
            String[] item = broker.split(":");
            int nums = Integer.parseInt(item[1]);
            for (int i = 0; i < nums; i++) {
                MessageQueue mq = new MessageQueue(topic, item[0], i);
                info.getMessageQueueList().add(mq);
            }
        }

        info.setOrderTopic(true);
    } else {
        List<QueueData> qds = route.getQueueDatas();
        Collections.sort(qds);
        for (QueueData qd : qds) {
            if (PermName.isWriteable(qd.getPerm())) {
                BrokerData brokerData = null;
                for (BrokerData bd : route.getBrokerDatas()) {
                    if (bd.getBrokerName().equals(qd.getBrokerName())) {
                        brokerData = bd;
                        break;
                    }
                }

                if (null == brokerData) {
                    continue;
                }

                // 判斷有沒有master
                if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                    continue;
                }
                // 建立對應的 messageQueue
                for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    info.getMessageQueueList().add(mq);
                }
            }
        }

        info.setOrderTopic(false);
    }

    return info;
}
複制代碼           

這裡就是将傳回的topicRouteData 轉成對應的topicPublishInfo,這個topicPublishInfo其實裡面就是MessageQueue,比如說我topicRouteData 裡面傳回2個broker ,然後每個broker的writeQueueNums 個數是4個,這個時候它生成的MessageQueue就是8個,然後每個broker對應着4個MessageQueue。

接着就是周遊更新各個producer的topicPublishInfoTable 對應topic資訊。 好了,到這我們的更新topic資訊的解析就結束了。

2.2 建立topic

有這麼一個場景:

我發送某個消息的時候指定的那個topic不存在(就是之前沒有建立過)消息生産者是怎樣處理的,預設的話如果topic不存在的話,消息生産者會先去nameserv拉下topic資訊(從nameserv擷取topic資訊流程),要是還不存在的話,

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    /**
     * 第一次發送消息時,本地沒有緩存topic的路由資訊,查詢
     * NameServer嘗試擷取路由資訊,如果路由資訊未找到,再次嘗試用默
     * 認主題DefaultMQProducerImpl#createTopicKey去查詢
     */
    /**
     * 生産環境,不建議開啟自動建立主題
     * 原因如:https://mp.weixin.qq.com/s/GbSlS3hi8IE0kznTynV4ZQ
     */
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 首先,使用topic 從NameServer嘗試擷取路由資訊
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // isDefault 為true 其次,使用預設的topic從NameServer嘗試擷取路由資訊
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
複制代碼           

這裡就走else的代碼了,其實還是調用的updateTopicRouteInfoFromNameServer重載方法,這裡這個isDefault 是true了。這個時候就擷取一下預設topic的路由資訊,這個預設topic是TBW102,發送消息就選擇TBW102這個topic的broker發送,broker收到消息後會自動建立這個topic,這裡需要注意的是broker得支援自動建立,這裡是有個參數配置的autoCreateTopicEnable 設定成true就可以了。

topic我們一般是不會在producer中自動建立的,一般使用RocketMQ的可視化控制台,然後建立topic,指定對應的queue num,指定broker等等,類似下面這個東西

RocketMQ源碼——producer 啟動流程(擷取topic路由資訊)
原文連結:https://juejin.cn/post/7209510210422980667

繼續閱讀