天天看点

RocketMQ源码——producer 启动流程(获取topic路由信息)

本文我们来分析rocketMq producer 发送消息的流程.

producer发送消息的示例在org.apache.rocketmq.example.simple.Producer类中,代码如下:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 1. 创建 DefaultMQProducer 对象
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

       
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Launch the instance.
         */
        // todo 2. 启动 producer
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

              
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );


                // 3. 发送消息
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } 
            ...
        }
        producer.shutdown();
    }
}
复制代码           

以上代码分三步走:

  1. 创建 DefaultMQProducer 对象
  2. 启动 producer
  3. 发送消息

接下来我们的分析也按这三步进行。

1. DefaultMQProducer构造方法

我们看下DefaultMQProducer 这个类有哪些功能:

  • 功能一:给消息生产者配置参数,调整参数就是调用这个类的api,比如上面我们设置nameserv地址producer.setNamesrvAddr("127.0.0.1:9876");,可以把它看作一个配置类,
  • 功能二:发送消息的功能,这里它发送消息都是调用defaultMQProducerImpl 这个类,
  • 功能三:它实现MQAdmin 接口里面关于topic与MessageQueue的操作。

我们来看下它里面有哪些重要的参数:

字段 默认值 解释
producerGroup null 组信息,需要你自己指定
defaultTopicQueueNums 4 就是新建一个topic 默认设置4个MessageQueue
sendMsgTimeout 3000 发送消息的超时时间 单位ms
compressMsgBodyOverHowmuch 1024 * 4 这个就是当发送的消息内容大于这个数的时候 进行压缩,压缩阈值,默认是4k
maxMessageSize 1024 * 1024 * 4 允许发送消息最大大小 默认是4m
retryTimesWhenSendFailed 2 发送失败的时候重试次数 默认是2次
retryTimesWhenSendAsyncFailed 2 异步发送失败的时候重试次数 默认是2次
retryAnotherBrokerWhenNotStoreOK false 指示是否在内部发送失败时重试另一个broker

DefaultMQProducer这个类还继承了一个ClientConfig ,这ClientConfig 不用看就知道是个客户端配置类,我们看看它里面重要字段的解释:

字段 默认值 解释
namesrvAddr 默认去jvm启动参数 rocketmq.namesrv.addr ,系统环境变量 NAMESRV_ADDR 中找 nameserv的地址,这个最好是自己设置进去,你在DefaultMQProducer 类set 的就是赋值给了它
clientIP 自己去找 本客户端地址,他自己就会去找的
instanceName 默认是去jvm启动参数rocketmq.client.name 中找,没有设置DEFAULT,这个它会自己重新设置的 实例名称
clientCallbackExecutorThreads cpu核心数 执行callback 线程池的线程核心数
pollNameServerInterval 1000 * 30 多久去nameserv 获取topic 信息,默认是30s
heartbeatBrokerInterval 1000 * 30 与broker心跳间隔时间,默认是30s,就是每隔30向broker发送心跳

DefaultMQProducer还实现一个MQProducer 接口,不用看,这个MQProducer 接口就是一堆send方法与start,shutdown方法,然后让DefaultMQProducer去实现。DefaultMQProducer构造方法代码如下:

public DefaultMQProducer(final String producerGroup) {
    // 继续调用
    this(null, producerGroup, null);
}


/**
 * 最终调用的构造方法
 */
public DefaultMQProducer(final String namespace, 
        final String producerGroup, RPCHook rpcHook) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
复制代码           

这个方法就是简单地赋了值,然后创建了DefaultMQProducerImpl实例,我们继续看DefaultMQProducerImpl的构造方法:

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;

    // 5w大小的链表队列,异步发送线程池队列
    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    // 默认的异步发送线程池
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),// 核心线程池数,cpu核数
        Runtime.getRuntime().availableProcessors(), // 最大线程池数,cpu核数
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.asyncSenderThreadPoolQueue,
        new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
}
复制代码           

这个构造方法依然还是处理赋值操作,并没做什么实质性内容,就不继续深究了。

2. DefaultMQProducer#start:启动producer

接着我们来看看producer的启动流程,进入DefaultMQProducer#start方法:

public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    // 调用 defaultMQProducerImpl 的 start() 方法
    this.defaultMQProducerImpl.start();
    // 消息轨迹相关,我们不关注
    if (null != traceDispatcher) {
        ...
    }
}
复制代码           

这个方法先是调用了defaultMQProducerImpl#start方法,然后处理消息轨迹相关操作,关于rocketMq消息轨迹相关内容,本文就不过多探讨了,我们将目光聚集于DefaultMQProducerImpl#start(boolean)方法:

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        // 刚创建还没有启动
        case CREATE_JUST:
            // 设置为启动失败状态
            this.serviceState = ServiceState.START_FAILED;

            // 检查group配置
            this.checkConfig();

            // 只要group组 不是CLIENT_INNER_PRODUCER, 就重新设置下实例名称
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                // 改变生产者的instanceName 为进程id
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            // todo 创建MQClientInstance 实例(封装了网络处理API,消息生产者、消费者和Namesrv、broker打交道的网络通道)
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            // todo 进行注册
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                // 没有注册成功,设置状态为创建没有启动,然后抛出之前已经注册的异常
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            // topic --> topic info
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            // 是否启动 client实例,默认是true
            if (startFactory) {
                // todo 核心
                mQClientFactory.start();
            }

            // 启动生产者成功
            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            // 设置状态running
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    // todo 发送心跳到所有broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

    // 定时扫描异步请求的返回结果
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}
复制代码           

这个方法并不复杂相关内容都已经作了注释,这里重点提出3个方法:

  1. mQClientFactory.start():执行方法为MQClientInstance#start,这个方法里会启动一些组件,我们稍后会分析。
  2. mQClientFactory.sendHeartbeatToAllBrokerWithLock():发送心跳到所有的broker,最终执行的方法为MQClientAPIImpl#sendHearbeat:
  3. public int sendHearbeat( final String addr, final HeartbeatData heartbeatData, final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { // request 的 code 为 HEART_BEAT RemotingCommand request = RemotingCommand .createRequestCommand(RequestCode.HEART_BEAT, null); request.setLanguage(clientConfig.getLanguage()); request.setBody(heartbeatData.encode()); // 异步调用 RemotingCommand response = this.remotingClient .invokeSync(addr, request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return response.getVersion(); } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } 复制代码
  4. 这里是与broker通信,request 的 code 为 HEART_BEAT,后面的分析中我们会看到,producer也会同nameServer通信。
  5. 定时扫描异步请求的返回结果:最终执行的方法为RequestFutureTable.scanExpiredRequest(),关于该方法的内容,我们在分析producer发送异步消息时再分析。

2.1 MQClientInstance#start:启动MQClientInstance

接下来我们来看看MQClientInstance的启动,方法为MQClientInstance#start,代码如下:

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                // 先设置 失败
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                // 判断namesrv 是否为null
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // todo Start request-response channel
                // 启动远程服务,这个方法只是装配了netty客户端相关配置
                // todo 注意:1. 这里是netty客户端,2. 这里并没有创建连接
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                // todo 开启任务调度
                this.startScheduledTask();
                // Start pull service
                // todo 开启 拉取服务
                this.pullMessageService.start();
                // Start rebalance service
                // todo 开启平衡服务
                this.rebalanceService.start();
                // Start push service
                // todo 启用内部的 producer
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                // 设置状态为running
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}
复制代码           

这个方法进行的操作在注释中已经说明得很清楚了,接下来我们对以上的部分操作做进一步分析。

2.1.1 mQClientAPIImpl.start():配置netty客户端

在看MQClientAPIImpl 的启动方法之前,我们需要看下它的构造:

public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
    final ClientRemotingProcessor clientRemotingProcessor,
    RPCHook rpcHook, final ClientConfig clientConfig) {
    // 客户端配置
    this.clientConfig = clientConfig;
    topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
    // todo netty client
    this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
    this.clientRemotingProcessor = clientRemotingProcessor;

    // 注册rpc hook
    this.remotingClient.registerRPCHook(rpcHook);
    // 注册 processor CHECK_TRANSACTION_STATE 检查事务状态
    this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);

    // 通知消费者id已更改
    this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);

    // 重置消费者客户端偏移量
    this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);

    // 从客户端拉取消费者状态
    this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);

    // 获取消费者运行状态
    this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);

    // 消费信息
    this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);

    this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
复制代码           

clientConfig这个是关于client配置的一些信息,然后往后创建了一个NettyRemotingClient对象,这个就是netty client,封装了一些同步异步的发送,接着clientRemotingProcessor,这个就是netty收到消息时候处理类,然后下面这一堆注册processor这个就不看了,其实就是当收到这些code的消息时候让哪个processor来处理。

先来看下clientConfig 有哪些配置字段需要我们关注:

字段 默认值 解释 调优
clientWorkerThreads 4 这个其实就是处理netty 那堆自定义handler的线程
clientCallbackExecutorThreads cpu核心线程数 callback线程数,这个用来执行你注册的那些processor
clientOnewaySemaphoreValue 默认是65535 当发送单向消息的时候,信号量限流
clientAsyncSemaphoreValue 65535 当发送异步消息的时候,信号量限流
connectTimeoutMillis 3000 连接超时时间 没啥现实意义
clientSocketSndBufSize 65535 netty 客户端 发送buffer
clientSocketRcvBufSize 65535 netty 客户端 接收buffer
clientChannelMaxIdleTimeSeconds 120s 这个就是netty channel 最大空闲时间 也能调优

看下NettyRemotingClient 的构造方法:

public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
    final ChannelEventListener channelEventListener) {
    // 设置 异步与单向请求的信号量
    super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
    // netty配置文件
    this.nettyClientConfig = nettyClientConfig;
    // listener
    this.channelEventListener = channelEventListener;

    // 这个是netty 客户端回调线程数,默认是cpu核数
    int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
    // 如果小于等于0 默认为4
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }

    // 设置线程池
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    // selector 线程为1
    this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
        }
    });

    // 是否启用tls
    if (nettyClientConfig.isUseTLS()) {
        try {
            sslContext = TlsHelper.buildSslContext(true);
            log.info("SSL enabled for client");
        } catch (IOException e) {
            log.error("Failed to create SSLContext", e);
        } catch (CertificateException e) {
            log.error("Failed to create SSLContext", e);
            throw new RuntimeException("Failed to create SSLContext", e);
        }
    }
}
复制代码           

首先是调用父类的构造,创建2个信号量Semaphore,这两个分别用来对单向发送,异步发送进行限流,默认是65535。之后就是创建public线程池,这个线程池主要是用来处理你注册那堆processor,默认线程数是cpu核心数,再往后就是创建netty niogroup组就1个线程,这个主要是用来处理连接的,最后就是判断是否使用ssl,使用的话创建ssl context。

再来看NettyRemotingClient#start方法,代码如下:

@Override
public void start() {
    // 默认 4个线程
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true) // 不使用tcp中的DELAY算法,就是有小包也要发送出去
        .option(ChannelOption.SO_KEEPALIVE, false) // keeplive false
            // 链接超时 默认3s
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            //发送buffer 默认是65535
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            // 接收buffer 默认是65535
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // 是否启用tls
                if (nettyClientConfig.isUseTLS()) {
                    if (null != sslContext) {
                        pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                        log.info("Prepend SSL handler");
                    } else {
                        log.warn("Connections are insecure as SSLContext is null!");
                    }
                }
                pipeline.addLast(
                        // 使用这个线程组处理下面这写processor 默认是4个线程
                    defaultEventExecutorGroup,
                    new NettyEncoder(), // 编码
                    new NettyDecoder(), // 解码
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                    new NettyConnectManageHandler(), // 连接管理
                    new NettyClientHandler());  // todo netty client handler 处理接收到的消息
            }
        });

    // todo 扫描消息获取结果,每秒执行1次
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                // todo 扫描响应表
                NettyRemotingClient.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);

    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
}
复制代码           

对于这个方法,说明有两个点:

  1. 方法里使用的是Bootstrap而非ServerBootstrap,表示这是netty客户端
  2. 整个方法中并没有创建连接

2.1.2 startScheduledTask():启动定时任务

启动定时任务的方法为MQClientInstance#startScheduledTask,代码如下:

private void startScheduledTask() {
    if (null == this.clientConfig.getNamesrvAddr()) {
        // todo 获取namesrv地址, 2分钟执行一次
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 从namesrv上面更新topic的路由信息,默认30s
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 清除下线broker
                MQClientInstance.this.cleanOfflineBroker();
                // todo 发送心跳到所有broker上面
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 持久化consumer offset 可以放在本地文件,也可以推送到 broker
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // todo 调整线程池的线程数量,并没有用上
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}
复制代码           

这里共有5个定时任务:

  1. 定时获取 nameServer 的地址,MQClientInstance#start一开始会调用MQClientAPIImpl#fetchNameServerAddr获取nameServer,这里也调用了这个方法
  2. 定时更新topic的路由信息,这里会去nameServer获取路由信息,之后再分析
  3. 定时发送心跳信息到nameServer,在DefaultMQProducerImpl#start(boolean)中,我们也提到了向nameServer发送心跳信息,两处调用的是同一个方法
  4. 持久化消费者的消费偏移量,这个仅对消费者consumer有效,后面分析消费者时再作分析
  5. 调整线程池的线程数量,不过追踪到最后,发现这个并没有生效,就不多说了

定时更新topic的路由信息

这个就是它的定时任务,默认是30s执行一次的。我们看下updateTopicRouteInfoFromNameServer():

public void updateTopicRouteInfoFromNameServer() {
    // 从消费者端与生产者端 获取topic集合
    Set<String> topicList = new HashSet<String>();

    // Consumer
    {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                Set<SubscriptionData> subList = impl.subscriptions();
                if (subList != null) {
                    for (SubscriptionData subData : subList) {
                        topicList.add(subData.getTopic());
                    }
                }
            }
        }
    }

    // Producer
    {
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                Set<String> lst = impl.getPublishTopicList();
                topicList.addAll(lst);
            }
        }
    }

    // 遍历topic集合,进行更新路由信息
    for (String topic : topicList) {
        // todo
        this.updateTopicRouteInfoFromNameServer(topic);
    }
}
复制代码           

它是将consumer 与producer 所有topic 放到一个topic集合中,然后遍历这个集合,一个一个topic请求更新。

接下来我们来看下这个updateTopicRouteInfoFromNameServer 方法,这方法有点长,我们一部分一部分的介绍,大体上分为2个部分吧,第一个部分是获取对应topic的信息,然后第二部分就是更新本地的topic table 缓存。

获取部分代码:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                // 默认 并且 defaultMQProducer不为空,当topic不存在的时候会进入这个if中
                if (isDefault && defaultMQProducer != null) {
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                        1000 * 3);
                    if (topicRouteData != null) {
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    // todo 获取topicRoute信息
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
                ...
}
复制代码           

if是true这段是你没有topic 然后需要创建topic 的时候干的事,它会向nameserv获取topic 为TBW102的值,然后获取它对应的那个topicRouteData。

fasle的时候进入else里面,这个就是拿着topic去nameserv那获取对应的topicRouteData。 这里需要解释下这个TopicRouteData,可以理解为里面存了两部分内容,一是broker 地址信息,二是messagequeue 对应哪个broker上面。稍微看下:

/**
 * 包含两部分内容,一是broker 地址信息,二是messagequeue 对应哪个broker上面
 */
public class TopicRouteData extends RemotingSerializable {
    // 顺序消息配置内容,来自kvConfig
    private String orderTopicConf;
    // topic队列元数据
    private List<QueueData> queueDatas;
    // topic分布的broker元数据
    private List<BrokerData> brokerDatas;
    // broker上过滤服务器的地址列表
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
复制代码           

接着我们追踪下从nameserv获取topic信息的代码:

/**
 * 从namesrv获取topic路由信息
 * @param topic topic名称
 * @param timeoutMillis 超时时间,默认3s
 * @param allowTopicNotExist 允许这个topic不存在
 */
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
    boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    // 获取路由信息的请求头
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);

    // 创建RemotingCommand 创建请求实体
    // 发送请求的 code 为 GET_ROUTEINFO_BY_TOPIC
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);

    // todo 进行同步调用,获取结果
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        // topic不存在
        case ResponseCode.TOPIC_NOT_EXIST: {
            // 允许topic不存在
            if (allowTopicNotExist) {
                log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
            }

            break;
        }
        // 成功
        case ResponseCode.SUCCESS: {
            // 获取内容
            byte[] body = response.getBody();
            if (body != null) {
                return TopicRouteData.decode(body, TopicRouteData.class);
            }
        }
        default:
            break;
    }

    // 抛异常
    throw new MQClientException(response.getCode(), response.getRemark());
}
复制代码           

我们可以看到,先是封装了一个获取路由的请求头,然后将topic 设置到请求头里面,然后又将请求头封装成RemotingCommand,其实不管是requestHeader还是RemotingCommand,其实都是存储信息的一些实体,只是代表的含义不一样而已,你可以把RemotingCommand 看作是http协议的格式,想想http有请求行,请求头,请求体,然后他这个requestHeader算是个请求头,然后RemotingCommand里面还有body属性可以看作是请求体,需要注意的是,它设置了一个RequestCode是GET_ROUTEINTO_BY_TOPIC,其实nameserv就是根据这个请求code判断出来你要干什么,只有你把GET_ROUTEINTO_BY_TOPIC 告诉nameserv的时候,nameserv才能知道你要从我这里获取这个topic的路由信息。

这里发送向NameServer发送消息的code是GET_ROUTEINFO_BY_TOPIC,这点在前面分析nameServer的消息处理时也分析过了,并且还分析了当消息送达nameServer后,nameServer是如何返回topic数据的,遗忘的小伙伴可以看下之前分析nameServer的文章。RocketMQ源码3-NameServer 消息处理 第3节

接着就是调用remotingClient发送消息了,它这里用的是同步发送的方式(这里暂时先不说了,其实就是使用netty client 发送消息, 后面有提到),也就是阻塞等着响应过来,超时时间默认是3s,再往下看如果这个响应code是success的话,就把body弄出来然后反序列化话成 TopicRouteData对象。

其实这里有个问题,就是我们有多个nameserv ,比如说我们有3台nameserv ,那么生产者是找谁获取的呢?其实它是这个样子的,你上次用的那个nameserv要是还ok的话,也就是连接没断的话,它会继续使用你上次用的个,如果你是第一次发起这种请求,没有与nameserv创建过连接或者是上次创建的那个连接不好使了,这个时候就会有个计数器,轮询的使用 ,也就是计数器值+1%namserv地址个数的形式,如果不理解的同学可以找个轮询算法看下,

其实都是使用计数器+1 % 列表的个数,这样能够选出来一个列表的位置来,再根据这个位置去列表中获取一下具体的值,好了这个轮询算法我们先解释这么多 ,如果能够正常创建连接,直接使用这个连接了就,如果不能使用,也就是创建连接失败,访问不通等等,这时候继续循环使用这个轮询算法获取下一个地址,然后创建连接,如果能够创建成功,返回对应的channel就可以了,然后client可以往这个channel上发送请求了,这个channel的话可以看作两端的通道,管道都可以。如果不成功继续循环,他这里循环次数是你配置nameserv地址的个数。

@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    // 开始时间
    long beginStartTime = System.currentTimeMillis();
    // todo 轮询获取namesrv地址Channel
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            // 执行开始之前的rpchook
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            // 判断超时 之前有获取链接的操作,可能会出现超时的情况
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            // todo 进行同步执行,获取响应
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            // 执行之后的rpchook
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
            // 远程发送请求异常
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            // 关闭channel
            this.closeChannel(addr, channel);
            throw e;
            // 超时异常
        } catch (RemotingTimeoutException e) {
            // 如果超时 就关闭cahnnel话,就关闭channel 默认是不关闭的
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}
复制代码           

好了,这里我们就把获取路由信息的部分看完了,接着就是解析这个TopicRouteData 然后放到生产者本地缓存起来了。

更新本地路由缓存

这块内容也是比较长,我们一段一段看下:

if (topicRouteData != null) {
    // 之前的
    TopicRouteData old = this.topicRouteTable.get(topic);
    // 对比,看是否发生变化
    boolean changed = topicRouteDataIsChange(old, topicRouteData);
    // 没有发生变化
    if (!changed) {
        // 调用isNeedUpdateTopicRouteInfo 再次判断是否需要更新
        changed = this.isNeedUpdateTopicRouteInfo(topic);
    } else {
        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
    }
复制代码           

他这里先是从 topic table中获取对应topic 的老数据,然后 拿老的 与新请求的进行对比,判断一下有没有变动,如果没有变动的话,就调用isNeedUpdateTopicRouteInfo 方法再判断一下需要更新,这个方法其实就是遍历所有的producer 或者是consumer,然后看看他们的topic table里面是不是都有这个topic 没有的话就需要更新下。

if (changed) {
    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

    // 更新broker地址
    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    }

    // Update Pub info
    // 更新推送消息/就是更新生产者topic信息
    {
        // todo 将topic 转成topic publish
        TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
        publishInfo.setHaveTopicRouterInfo(true);
        // 调用所有的producer 更新对应的topic info
        Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQProducerInner> entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                // 更新 topic publishinfo
                impl.updateTopicPublishInfo(topic, publishInfo);
            }
        }
    }

    // Update sub info
    // 更新订阅信息 更新消费者topic信息
    {
        Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                impl.updateTopicSubscribeInfo(topic, subscribeInfo);
            }
        }
    }
    log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    // 添加到route表中
    this.topicRouteTable.put(topic, cloneTopicRouteData);
    return true;
}
复制代码           

这里首先是更新了一下brokerAddrTable这个map ,这map里面然后就是缓存着broker name-> broker地址的集合

接着就是将 topicRouteData转成topicPublishInfo ,然后 haveTopicRouterInfo设置成true,就是说明它这个topicPublishInfo 里面存着对应的topicRouteData信息。

// 将topic 路由信息转成topic publish 信息 提供给消息发送者发送消息使用
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
    TopicPublishInfo info = new TopicPublishInfo();
    info.setTopicRouteData(route);
    if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
        String[] brokers = route.getOrderTopicConf().split(";");
        for (String broker : brokers) {
            String[] item = broker.split(":");
            int nums = Integer.parseInt(item[1]);
            for (int i = 0; i < nums; i++) {
                MessageQueue mq = new MessageQueue(topic, item[0], i);
                info.getMessageQueueList().add(mq);
            }
        }

        info.setOrderTopic(true);
    } else {
        List<QueueData> qds = route.getQueueDatas();
        Collections.sort(qds);
        for (QueueData qd : qds) {
            if (PermName.isWriteable(qd.getPerm())) {
                BrokerData brokerData = null;
                for (BrokerData bd : route.getBrokerDatas()) {
                    if (bd.getBrokerName().equals(qd.getBrokerName())) {
                        brokerData = bd;
                        break;
                    }
                }

                if (null == brokerData) {
                    continue;
                }

                // 判断有没有master
                if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                    continue;
                }
                // 创建对应的 messageQueue
                for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    info.getMessageQueueList().add(mq);
                }
            }
        }

        info.setOrderTopic(false);
    }

    return info;
}
复制代码           

这里就是将返回的topicRouteData 转成对应的topicPublishInfo,这个topicPublishInfo其实里面就是MessageQueue,比如说我topicRouteData 里面返回2个broker ,然后每个broker的writeQueueNums 个数是4个,这个时候它生成的MessageQueue就是8个,然后每个broker对应着4个MessageQueue。

接着就是遍历更新各个producer的topicPublishInfoTable 对应topic信息。 好了,到这我们的更新topic信息的解析就结束了。

2.2 创建topic

有这么一个场景:

我发送某个消息的时候指定的那个topic不存在(就是之前没有创建过)消息生产者是怎样处理的,默认的话如果topic不存在的话,消息生产者会先去nameserv拉下topic信息(从nameserv获取topic信息流程),要是还不存在的话,

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    /**
     * 第一次发送消息时,本地没有缓存topic的路由信息,查询
     * NameServer尝试获取路由信息,如果路由信息未找到,再次尝试用默
     * 认主题DefaultMQProducerImpl#createTopicKey去查询
     */
    /**
     * 生产环境,不建议开启自动创建主题
     * 原因如:https://mp.weixin.qq.com/s/GbSlS3hi8IE0kznTynV4ZQ
     */
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 首先,使用topic 从NameServer尝试获取路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // isDefault 为true 其次,使用默认的topic从NameServer尝试获取路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
复制代码           

这里就走else的代码了,其实还是调用的updateTopicRouteInfoFromNameServer重载方法,这里这个isDefault 是true了。这个时候就获取一下默认topic的路由信息,这个默认topic是TBW102,发送消息就选择TBW102这个topic的broker发送,broker收到消息后会自动创建这个topic,这里需要注意的是broker得支持自动创建,这里是有个参数配置的autoCreateTopicEnable 设置成true就可以了。

topic我们一般是不会在producer中自动创建的,一般使用RocketMQ的可视化控制台,然后创建topic,指定对应的queue num,指定broker等等,类似下面这个东西

RocketMQ源码——producer 启动流程(获取topic路由信息)
原文链接:https://juejin.cn/post/7209510210422980667

继续阅读