天天看點

java 建立topic,RocketMQ在Java代碼之中手動建立Topic

Rocketmq在Java代碼之中手動建立Topic

【原創,轉載請注明出處】

我的 【部落格園首頁】 【CSDN首頁】 【簡書首頁】

加V進Java交流群,備注Java交流:w1129574379

** 本文僅限RocketMQ 4.5.*版本,其他版本可能有差別,僅供參考 **

本文僅限單 name server 的情況,nameserver叢集的情況下不确定能否正常工作(原理都是一樣的)

參考資訊源及相關類/方法

org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute

org.apache.rocketmq.client.ClientConfig

額外的依賴

需要添加額外的maven依賴,版本自選

org.apache.rocketmq

rocketmq-tools

4.5.1

org.apache.rocketmq

rocketmq-spring-boot-starter

2.0.3

使用

在指令行中建立Topic時應使用updateTopic指令,其使用示例如下:

./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t tx-mq-TOPIC

各參數含義如下:

mqadmin updateTopic [-b ] [-c ] [-h] [-n ] [-o ] [-p ] [-r ] [-s ]

-t [-u ] [-w ]

-b,--brokerAddr create topic to which broker

-c,--clusterName create topic to which cluster

-h,--help Print help

-n,--namesrvAddr Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876

-o,--order set topic's order(true|false)

-p,--perm set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]

-r,--readQueueNums set read queue nums

-s,--hasUnitSub has unit sub (true|false)

-t,--topic topic name

-u,--unit is unit topic (true|false)

-w,--writeQueueNums set write queue nums

注意事項及存在的問題:

RocketMQ規定,在使用updateTopic指令建立topic時,-b或-c選項必須指定其中一個(都指定則處理-b參數,忽略-c參數),與此同時,-t參數也為必要參數,缺少這幾個必要參數則topic建立失敗!!!

使用Java代碼手動建立topic的形式中,使用-b選項建立topic可以正常使用,此時RocketMQ直接使用指定的broker位址來找到broker并在對應broker上建立topic。

使用Java代碼手動建立topic的形式中,直接使用-c選項建立topic無法建立成功,因為org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute方法需要一個DefaultMQAdminExt對象來連接配接到對應nameserver上以便擷取對應叢集下的所有broker資訊,

DefaultMQAdminExt對象針對nameserver的處理代碼為:private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses(),檢視代碼發現其實際實作為System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV))。此時問題出現了,這裡的實作代碼并不能get到namesrv位址的參數值,導緻連接配接到namesrv失敗,完整報錯資訊如下:

點選檢視完整報錯資訊

org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failed

at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:185)

at com.bayss.bws.common.utils.RocketMQUtil.createTopic(RocketMQUtil.java:54)

at com.bayss.bws.agent.internal.ConsumerManager.init(ConsumerManager.java:78)

at com.bayss.bws.agent.core.InitializeAgent.init(InitializeAgent.java:74)

at com.bayss.bws.agent.core.InitializeAgent.onApplicationEvent(InitializeAgent.java:184)

at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)

at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)

at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)

at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:402)

at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:359)

at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:896)

at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)

at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)

at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)

at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)

at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)

at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)

at com.bayss.bws.agent.core.BwsAgentApplication.main(BwsAgentApplication.java:24)

Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to failed

at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:392)

at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1193)

at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:275)

at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:222)

at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:83)

at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:158)

... 17 more

于是決定采用手動set屬性值的方式(對應屬性值設值完成後不再需要-n參數即可建立topic),手動set屬性值的代碼如下:

@Configuration

public class RocketMQConfig implements InitializingBean {

// 必須保證這裡能擷取到正确的namesrv位址,否則再次gg

@Value("${rocketmq.name-server}")

private String rocketMQNamesrv;

@Override

public void afterPropertiesSet() throws Exception {

System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, rocketMQNamesrv);

}

}

在指令行中使用updateTopic指令幫助時,有-n選項,但是在RocketMQ源碼中并沒有發現該選項的處理邏輯,并且在隻設定-n(namesrv)時程式會報錯(因為缺少第一條所說的-b或-c參數)。

使用示例:

String[] subargs = new String[] {

"-b 10.1.4.231:10911",

"-t unit-test-from-java-111",

//"-r 8",

//"-w 8",

//"-p 6",

//"-o false",

//"-u false",

//"-s false"

};

boolean isTopicCreated = RocketMQUtil.createTopic(subargs);

//boolean isTopicCreated = RocketMQUtil.createTopic("10.1.4.231:10911", "", "testttttt");

if (isTopicCreated) {

System.err.println("topic create success");

}

Java工具類

import org.apache.commons.cli.CommandLine;

import org.apache.commons.cli.Options;

import org.apache.commons.cli.PosixParser;

import org.apache.commons.lang3.StringUtils;

import org.apache.rocketmq.client.exception.MQBrokerException;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.protocol.body.ClusterInfo;

import org.apache.rocketmq.common.protocol.route.BrokerData;

import org.apache.rocketmq.remoting.exception.RemotingConnectException;

import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;

import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;

import org.apache.rocketmq.srvutil.ServerUtil;

import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

import org.apache.rocketmq.tools.command.SubCommandException;

import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;

import java.util.HashSet;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import java.util.Set;

public class RocketMQUtil {

public static boolean createTopic(String[] subargs) throws SubCommandException {

UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();

Options options = ServerUtil.buildCommandlineOptions(new Options());

final Options updateTopicOptions = cmd.buildCommandlineOptions(options);

final CommandLine commandLine = ServerUtil

.parseCmdLine("mqadmin " cmd.commandName(),

subargs, updateTopicOptions, new PosixParser());

cmd.execute(commandLine, updateTopicOptions, null);

return true;

}

public static boolean createTopic(String brokerAddr, String clusterName, String topic) throws Exception {

if (StringUtils.isBlank(topic)) {

return false;

}

List argList = new LinkedList<>();

argList.add("-t " topic);

if (StringUtils.isNotBlank(brokerAddr)) {

argList.add("-b " brokerAddr.trim());

} else {

argList.add("-c " clusterName.trim());

}

return createTopic(argList.toArray(new String [0]));

}

public static boolean createTopic(String namesrvAddr, String topic) {

try {

Set clusterNames = RocketMQUtil.getClusterNames(namesrvAddr);

for (String clusterName : clusterNames) {

RocketMQUtil.createTopic(null, clusterName, topic);

}

return true;

} catch (Exception e) {

e.printStackTrace();

return false;

}

}

public static ClusterInfo getClusterInfo(String namesrvAddr) throws MQClientException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {

if (StringUtils.isBlank(namesrvAddr)) {

return new ClusterInfo();

}

DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(5000L);

mqAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

mqAdminExt.setNamesrvAddr(namesrvAddr);

mqAdminExt.start();

ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();

mqAdminExt.shutdown();

return clusterInfo;

}

public static Set getClusterNames(String namesrvAddr) throws MQClientException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {

return getClusterInfo(namesrvAddr).getClusterAddrTable().keySet();

}

public static Map getAllBrokerInfo(String namesrvAddr) throws MQClientException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {

return getClusterInfo(namesrvAddr).getBrokerAddrTable();

}

public static Set getBrokerAddrs(String namesrvAddr) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {

Map allBrokerInfo = getAllBrokerInfo(namesrvAddr);

Set brokerAddrs = new HashSet<>();

for (BrokerData brokerData : allBrokerInfo.values()) {

brokerAddrs.addAll(brokerData.getBrokerAddrs().values());

}

return brokerAddrs;

}

}