本文我们来分析rocketMq producer 发送消息的流程.
producer发送消息的示例在org.apache.rocketmq.example.simple.Producer类中,代码如下:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 1. 创建 DefaultMQProducer 对象
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
/*
* Launch the instance.
*/
// todo 2. 启动 producer
producer.start();
for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 3. 发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
...
}
producer.shutdown();
}
}
复制代码
以上代码分三步走:
- 创建 DefaultMQProducer 对象
- 启动 producer
- 发送消息
接下来我们的分析也按这三步进行。
1. DefaultMQProducer构造方法
我们看下DefaultMQProducer 这个类有哪些功能:
- 功能一:给消息生产者配置参数,调整参数就是调用这个类的api,比如上面我们设置nameserv地址producer.setNamesrvAddr("127.0.0.1:9876");,可以把它看作一个配置类,
- 功能二:发送消息的功能,这里它发送消息都是调用defaultMQProducerImpl 这个类,
- 功能三:它实现MQAdmin 接口里面关于topic与MessageQueue的操作。
我们来看下它里面有哪些重要的参数:
字段 | 默认值 | 解释 |
producerGroup | null | 组信息,需要你自己指定 |
defaultTopicQueueNums | 4 | 就是新建一个topic 默认设置4个MessageQueue |
sendMsgTimeout | 3000 | 发送消息的超时时间 单位ms |
compressMsgBodyOverHowmuch | 1024 * 4 | 这个就是当发送的消息内容大于这个数的时候 进行压缩,压缩阈值,默认是4k |
maxMessageSize | 1024 * 1024 * 4 | 允许发送消息最大大小 默认是4m |
retryTimesWhenSendFailed | 2 | 发送失败的时候重试次数 默认是2次 |
retryTimesWhenSendAsyncFailed | 2 | 异步发送失败的时候重试次数 默认是2次 |
retryAnotherBrokerWhenNotStoreOK | false | 指示是否在内部发送失败时重试另一个broker |
DefaultMQProducer这个类还继承了一个ClientConfig ,这ClientConfig 不用看就知道是个客户端配置类,我们看看它里面重要字段的解释:
字段 | 默认值 | 解释 |
namesrvAddr | 默认去jvm启动参数 rocketmq.namesrv.addr ,系统环境变量 NAMESRV_ADDR 中找 | nameserv的地址,这个最好是自己设置进去,你在DefaultMQProducer 类set 的就是赋值给了它 |
clientIP | 自己去找 | 本客户端地址,他自己就会去找的 |
instanceName | 默认是去jvm启动参数rocketmq.client.name 中找,没有设置DEFAULT,这个它会自己重新设置的 | 实例名称 |
clientCallbackExecutorThreads | cpu核心数 | 执行callback 线程池的线程核心数 |
pollNameServerInterval | 1000 * 30 | 多久去nameserv 获取topic 信息,默认是30s |
heartbeatBrokerInterval | 1000 * 30 | 与broker心跳间隔时间,默认是30s,就是每隔30向broker发送心跳 |
DefaultMQProducer还实现一个MQProducer 接口,不用看,这个MQProducer 接口就是一堆send方法与start,shutdown方法,然后让DefaultMQProducer去实现。DefaultMQProducer构造方法代码如下:
public DefaultMQProducer(final String producerGroup) {
// 继续调用
this(null, producerGroup, null);
}
/**
* 最终调用的构造方法
*/
public DefaultMQProducer(final String namespace,
final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
复制代码
这个方法就是简单地赋了值,然后创建了DefaultMQProducerImpl实例,我们继续看DefaultMQProducerImpl的构造方法:
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
// 5w大小的链表队列,异步发送线程池队列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
// 默认的异步发送线程池
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),// 核心线程池数,cpu核数
Runtime.getRuntime().availableProcessors(), // 最大线程池数,cpu核数
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
复制代码
这个构造方法依然还是处理赋值操作,并没做什么实质性内容,就不继续深究了。
2. DefaultMQProducer#start:启动producer
接着我们来看看producer的启动流程,进入DefaultMQProducer#start方法:
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
// 调用 defaultMQProducerImpl 的 start() 方法
this.defaultMQProducerImpl.start();
// 消息轨迹相关,我们不关注
if (null != traceDispatcher) {
...
}
}
复制代码
这个方法先是调用了defaultMQProducerImpl#start方法,然后处理消息轨迹相关操作,关于rocketMq消息轨迹相关内容,本文就不过多探讨了,我们将目光聚集于DefaultMQProducerImpl#start(boolean)方法:
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
// 刚创建还没有启动
case CREATE_JUST:
// 设置为启动失败状态
this.serviceState = ServiceState.START_FAILED;
// 检查group配置
this.checkConfig();
// 只要group组 不是CLIENT_INNER_PRODUCER, 就重新设置下实例名称
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
// 改变生产者的instanceName 为进程id
this.defaultMQProducer.changeInstanceNameToPID();
}
// todo 创建MQClientInstance 实例(封装了网络处理API,消息生产者、消费者和Namesrv、broker打交道的网络通道)
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// todo 进行注册
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
// 没有注册成功,设置状态为创建没有启动,然后抛出之前已经注册的异常
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// topic --> topic info
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 是否启动 client实例,默认是true
if (startFactory) {
// todo 核心
mQClientFactory.start();
}
// 启动生产者成功
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 设置状态running
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// todo 发送心跳到所有broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 定时扫描异步请求的返回结果
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
复制代码
这个方法并不复杂相关内容都已经作了注释,这里重点提出3个方法:
- mQClientFactory.start():执行方法为MQClientInstance#start,这个方法里会启动一些组件,我们稍后会分析。
- mQClientFactory.sendHeartbeatToAllBrokerWithLock():发送心跳到所有的broker,最终执行的方法为MQClientAPIImpl#sendHearbeat:
- public int sendHearbeat( final String addr, final HeartbeatData heartbeatData, final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { // request 的 code 为 HEART_BEAT RemotingCommand request = RemotingCommand .createRequestCommand(RequestCode.HEART_BEAT, null); request.setLanguage(clientConfig.getLanguage()); request.setBody(heartbeatData.encode()); // 异步调用 RemotingCommand response = this.remotingClient .invokeSync(addr, request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return response.getVersion(); } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark(), addr); } 复制代码
- 这里是与broker通信,request 的 code 为 HEART_BEAT,后面的分析中我们会看到,producer也会同nameServer通信。
- 定时扫描异步请求的返回结果:最终执行的方法为RequestFutureTable.scanExpiredRequest(),关于该方法的内容,我们在分析producer发送异步消息时再分析。
2.1 MQClientInstance#start:启动MQClientInstance
接下来我们来看看MQClientInstance的启动,方法为MQClientInstance#start,代码如下:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
// 先设置 失败
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
// 判断namesrv 是否为null
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// todo Start request-response channel
// 启动远程服务,这个方法只是装配了netty客户端相关配置
// todo 注意:1. 这里是netty客户端,2. 这里并没有创建连接
this.mQClientAPIImpl.start();
// Start various schedule tasks
// todo 开启任务调度
this.startScheduledTask();
// Start pull service
// todo 开启 拉取服务
this.pullMessageService.start();
// Start rebalance service
// todo 开启平衡服务
this.rebalanceService.start();
// Start push service
// todo 启用内部的 producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
// 设置状态为running
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
复制代码
这个方法进行的操作在注释中已经说明得很清楚了,接下来我们对以上的部分操作做进一步分析。
2.1.1 mQClientAPIImpl.start():配置netty客户端
在看MQClientAPIImpl 的启动方法之前,我们需要看下它的构造:
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
// 客户端配置
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
// todo netty client
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
// 注册rpc hook
this.remotingClient.registerRPCHook(rpcHook);
// 注册 processor CHECK_TRANSACTION_STATE 检查事务状态
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
// 通知消费者id已更改
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
// 重置消费者客户端偏移量
this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
// 从客户端拉取消费者状态
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
// 获取消费者运行状态
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
// 消费信息
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
复制代码
clientConfig这个是关于client配置的一些信息,然后往后创建了一个NettyRemotingClient对象,这个就是netty client,封装了一些同步异步的发送,接着clientRemotingProcessor,这个就是netty收到消息时候处理类,然后下面这一堆注册processor这个就不看了,其实就是当收到这些code的消息时候让哪个processor来处理。
先来看下clientConfig 有哪些配置字段需要我们关注:
字段 | 默认值 | 解释 | 调优 |
clientWorkerThreads | 4 | 这个其实就是处理netty 那堆自定义handler的线程 | 是 |
clientCallbackExecutorThreads | cpu核心线程数 | callback线程数,这个用来执行你注册的那些processor | 是 |
clientOnewaySemaphoreValue | 默认是65535 | 当发送单向消息的时候,信号量限流 | 是 |
clientAsyncSemaphoreValue | 65535 | 当发送异步消息的时候,信号量限流 | 是 |
connectTimeoutMillis | 3000 | 连接超时时间 | 没啥现实意义 |
clientSocketSndBufSize | 65535 | netty 客户端 发送buffer | 是 |
clientSocketRcvBufSize | 65535 | netty 客户端 接收buffer | 是 |
clientChannelMaxIdleTimeSeconds | 120s | 这个就是netty channel 最大空闲时间 | 也能调优 |
看下NettyRemotingClient 的构造方法:
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
// 设置 异步与单向请求的信号量
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
// netty配置文件
this.nettyClientConfig = nettyClientConfig;
// listener
this.channelEventListener = channelEventListener;
// 这个是netty 客户端回调线程数,默认是cpu核数
int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
// 如果小于等于0 默认为4
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 设置线程池
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// selector 线程为1
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
}
});
// 是否启用tls
if (nettyClientConfig.isUseTLS()) {
try {
sslContext = TlsHelper.buildSslContext(true);
log.info("SSL enabled for client");
} catch (IOException e) {
log.error("Failed to create SSLContext", e);
} catch (CertificateException e) {
log.error("Failed to create SSLContext", e);
throw new RuntimeException("Failed to create SSLContext", e);
}
}
}
复制代码
首先是调用父类的构造,创建2个信号量Semaphore,这两个分别用来对单向发送,异步发送进行限流,默认是65535。之后就是创建public线程池,这个线程池主要是用来处理你注册那堆processor,默认线程数是cpu核心数,再往后就是创建netty niogroup组就1个线程,这个主要是用来处理连接的,最后就是判断是否使用ssl,使用的话创建ssl context。
再来看NettyRemotingClient#start方法,代码如下:
@Override
public void start() {
// 默认 4个线程
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true) // 不使用tcp中的DELAY算法,就是有小包也要发送出去
.option(ChannelOption.SO_KEEPALIVE, false) // keeplive false
// 链接超时 默认3s
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
//发送buffer 默认是65535
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
// 接收buffer 默认是65535
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 是否启用tls
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
// 使用这个线程组处理下面这写processor 默认是4个线程
defaultEventExecutorGroup,
new NettyEncoder(), // 编码
new NettyDecoder(), // 解码
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(), // 连接管理
new NettyClientHandler()); // todo netty client handler 处理接收到的消息
}
});
// todo 扫描消息获取结果,每秒执行1次
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// todo 扫描响应表
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
复制代码
对于这个方法,说明有两个点:
- 方法里使用的是Bootstrap而非ServerBootstrap,表示这是netty客户端
- 整个方法中并没有创建连接
2.1.2 startScheduledTask():启动定时任务
启动定时任务的方法为MQClientInstance#startScheduledTask,代码如下:
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
// todo 获取namesrv地址, 2分钟执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// todo 从namesrv上面更新topic的路由信息,默认30s
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// todo 清除下线broker
MQClientInstance.this.cleanOfflineBroker();
// todo 发送心跳到所有broker上面
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// todo 持久化consumer offset 可以放在本地文件,也可以推送到 broker
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// todo 调整线程池的线程数量,并没有用上
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
复制代码
这里共有5个定时任务:
- 定时获取 nameServer 的地址,MQClientInstance#start一开始会调用MQClientAPIImpl#fetchNameServerAddr获取nameServer,这里也调用了这个方法
- 定时更新topic的路由信息,这里会去nameServer获取路由信息,之后再分析
- 定时发送心跳信息到nameServer,在DefaultMQProducerImpl#start(boolean)中,我们也提到了向nameServer发送心跳信息,两处调用的是同一个方法
- 持久化消费者的消费偏移量,这个仅对消费者consumer有效,后面分析消费者时再作分析
- 调整线程池的线程数量,不过追踪到最后,发现这个并没有生效,就不多说了
定时更新topic的路由信息
这个就是它的定时任务,默认是30s执行一次的。我们看下updateTopicRouteInfoFromNameServer():
public void updateTopicRouteInfoFromNameServer() {
// 从消费者端与生产者端 获取topic集合
Set<String> topicList = new HashSet<String>();
// Consumer
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}
// Producer
{
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
Set<String> lst = impl.getPublishTopicList();
topicList.addAll(lst);
}
}
}
// 遍历topic集合,进行更新路由信息
for (String topic : topicList) {
// todo
this.updateTopicRouteInfoFromNameServer(topic);
}
}
复制代码
它是将consumer 与producer 所有topic 放到一个topic集合中,然后遍历这个集合,一个一个topic请求更新。
接下来我们来看下这个updateTopicRouteInfoFromNameServer 方法,这方法有点长,我们一部分一部分的介绍,大体上分为2个部分吧,第一个部分是获取对应topic的信息,然后第二部分就是更新本地的topic table 缓存。
获取部分代码:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
// 默认 并且 defaultMQProducer不为空,当topic不存在的时候会进入这个if中
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
// todo 获取topicRoute信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
...
}
复制代码
if是true这段是你没有topic 然后需要创建topic 的时候干的事,它会向nameserv获取topic 为TBW102的值,然后获取它对应的那个topicRouteData。
fasle的时候进入else里面,这个就是拿着topic去nameserv那获取对应的topicRouteData。 这里需要解释下这个TopicRouteData,可以理解为里面存了两部分内容,一是broker 地址信息,二是messagequeue 对应哪个broker上面。稍微看下:
/**
* 包含两部分内容,一是broker 地址信息,二是messagequeue 对应哪个broker上面
*/
public class TopicRouteData extends RemotingSerializable {
// 顺序消息配置内容,来自kvConfig
private String orderTopicConf;
// topic队列元数据
private List<QueueData> queueDatas;
// topic分布的broker元数据
private List<BrokerData> brokerDatas;
// broker上过滤服务器的地址列表
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
复制代码
接着我们追踪下从nameserv获取topic信息的代码:
/**
* 从namesrv获取topic路由信息
* @param topic topic名称
* @param timeoutMillis 超时时间,默认3s
* @param allowTopicNotExist 允许这个topic不存在
*/
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
// 获取路由信息的请求头
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
// 创建RemotingCommand 创建请求实体
// 发送请求的 code 为 GET_ROUTEINFO_BY_TOPIC
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
// todo 进行同步调用,获取结果
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
// topic不存在
case ResponseCode.TOPIC_NOT_EXIST: {
// 允许topic不存在
if (allowTopicNotExist) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
break;
}
// 成功
case ResponseCode.SUCCESS: {
// 获取内容
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
// 抛异常
throw new MQClientException(response.getCode(), response.getRemark());
}
复制代码
我们可以看到,先是封装了一个获取路由的请求头,然后将topic 设置到请求头里面,然后又将请求头封装成RemotingCommand,其实不管是requestHeader还是RemotingCommand,其实都是存储信息的一些实体,只是代表的含义不一样而已,你可以把RemotingCommand 看作是http协议的格式,想想http有请求行,请求头,请求体,然后他这个requestHeader算是个请求头,然后RemotingCommand里面还有body属性可以看作是请求体,需要注意的是,它设置了一个RequestCode是GET_ROUTEINTO_BY_TOPIC,其实nameserv就是根据这个请求code判断出来你要干什么,只有你把GET_ROUTEINTO_BY_TOPIC 告诉nameserv的时候,nameserv才能知道你要从我这里获取这个topic的路由信息。
这里发送向NameServer发送消息的code是GET_ROUTEINFO_BY_TOPIC,这点在前面分析nameServer的消息处理时也分析过了,并且还分析了当消息送达nameServer后,nameServer是如何返回topic数据的,遗忘的小伙伴可以看下之前分析nameServer的文章。RocketMQ源码3-NameServer 消息处理 第3节
接着就是调用remotingClient发送消息了,它这里用的是同步发送的方式(这里暂时先不说了,其实就是使用netty client 发送消息, 后面有提到),也就是阻塞等着响应过来,超时时间默认是3s,再往下看如果这个响应code是success的话,就把body弄出来然后反序列化话成 TopicRouteData对象。
其实这里有个问题,就是我们有多个nameserv ,比如说我们有3台nameserv ,那么生产者是找谁获取的呢?其实它是这个样子的,你上次用的那个nameserv要是还ok的话,也就是连接没断的话,它会继续使用你上次用的个,如果你是第一次发起这种请求,没有与nameserv创建过连接或者是上次创建的那个连接不好使了,这个时候就会有个计数器,轮询的使用 ,也就是计数器值+1%namserv地址个数的形式,如果不理解的同学可以找个轮询算法看下,
其实都是使用计数器+1 % 列表的个数,这样能够选出来一个列表的位置来,再根据这个位置去列表中获取一下具体的值,好了这个轮询算法我们先解释这么多 ,如果能够正常创建连接,直接使用这个连接了就,如果不能使用,也就是创建连接失败,访问不通等等,这时候继续循环使用这个轮询算法获取下一个地址,然后创建连接,如果能够创建成功,返回对应的channel就可以了,然后client可以往这个channel上发送请求了,这个channel的话可以看作两端的通道,管道都可以。如果不成功继续循环,他这里循环次数是你配置nameserv地址的个数。
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
// 开始时间
long beginStartTime = System.currentTimeMillis();
// todo 轮询获取namesrv地址Channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
// 执行开始之前的rpchook
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
// 判断超时 之前有获取链接的操作,可能会出现超时的情况
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
// todo 进行同步执行,获取响应
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
// 执行之后的rpchook
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
// 远程发送请求异常
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
// 关闭channel
this.closeChannel(addr, channel);
throw e;
// 超时异常
} catch (RemotingTimeoutException e) {
// 如果超时 就关闭cahnnel话,就关闭channel 默认是不关闭的
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
复制代码
好了,这里我们就把获取路由信息的部分看完了,接着就是解析这个TopicRouteData 然后放到生产者本地缓存起来了。
更新本地路由缓存
这块内容也是比较长,我们一段一段看下:
if (topicRouteData != null) {
// 之前的
TopicRouteData old = this.topicRouteTable.get(topic);
// 对比,看是否发生变化
boolean changed = topicRouteDataIsChange(old, topicRouteData);
// 没有发生变化
if (!changed) {
// 调用isNeedUpdateTopicRouteInfo 再次判断是否需要更新
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
复制代码
他这里先是从 topic table中获取对应topic 的老数据,然后 拿老的 与新请求的进行对比,判断一下有没有变动,如果没有变动的话,就调用isNeedUpdateTopicRouteInfo 方法再判断一下需要更新,这个方法其实就是遍历所有的producer 或者是consumer,然后看看他们的topic table里面是不是都有这个topic 没有的话就需要更新下。
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
// 更新broker地址
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
// 更新推送消息/就是更新生产者topic信息
{
// todo 将topic 转成topic publish
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
// 调用所有的producer 更新对应的topic info
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
// 更新 topic publishinfo
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
// 更新订阅信息 更新消费者topic信息
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
// 添加到route表中
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
复制代码
这里首先是更新了一下brokerAddrTable这个map ,这map里面然后就是缓存着broker name-> broker地址的集合
接着就是将 topicRouteData转成topicPublishInfo ,然后 haveTopicRouterInfo设置成true,就是说明它这个topicPublishInfo 里面存着对应的topicRouteData信息。
// 将topic 路由信息转成topic publish 信息 提供给消息发送者发送消息使用
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}
info.setOrderTopic(true);
} else {
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
// 判断有没有master
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
// 创建对应的 messageQueue
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
info.setOrderTopic(false);
}
return info;
}
复制代码
这里就是将返回的topicRouteData 转成对应的topicPublishInfo,这个topicPublishInfo其实里面就是MessageQueue,比如说我topicRouteData 里面返回2个broker ,然后每个broker的writeQueueNums 个数是4个,这个时候它生成的MessageQueue就是8个,然后每个broker对应着4个MessageQueue。
接着就是遍历更新各个producer的topicPublishInfoTable 对应topic信息。 好了,到这我们的更新topic信息的解析就结束了。
2.2 创建topic
有这么一个场景:
我发送某个消息的时候指定的那个topic不存在(就是之前没有创建过)消息生产者是怎样处理的,默认的话如果topic不存在的话,消息生产者会先去nameserv拉下topic信息(从nameserv获取topic信息流程),要是还不存在的话,
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
/**
* 第一次发送消息时,本地没有缓存topic的路由信息,查询
* NameServer尝试获取路由信息,如果路由信息未找到,再次尝试用默
* 认主题DefaultMQProducerImpl#createTopicKey去查询
*/
/**
* 生产环境,不建议开启自动创建主题
* 原因如:https://mp.weixin.qq.com/s/GbSlS3hi8IE0kznTynV4ZQ
*/
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 首先,使用topic 从NameServer尝试获取路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// isDefault 为true 其次,使用默认的topic从NameServer尝试获取路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
复制代码
这里就走else的代码了,其实还是调用的updateTopicRouteInfoFromNameServer重载方法,这里这个isDefault 是true了。这个时候就获取一下默认topic的路由信息,这个默认topic是TBW102,发送消息就选择TBW102这个topic的broker发送,broker收到消息后会自动创建这个topic,这里需要注意的是broker得支持自动创建,这里是有个参数配置的autoCreateTopicEnable 设置成true就可以了。
topic我们一般是不会在producer中自动创建的,一般使用RocketMQ的可视化控制台,然后创建topic,指定对应的queue num,指定broker等等,类似下面这个东西
原文链接:https://juejin.cn/post/7209510210422980667