前言
在看完rocketmq通信層的源碼之後,再來看namesrv的代碼相對容易些,rocketmq的服務注冊和發現并沒有采用zookeeper這樣的開源分布式協作架構,而是自研了一套服務注冊和發現的服務,相對來說比較簡單,多個namesrv之間不互相通信,也不是主從關系。
namesrv主要有兩個作用,一個是路由資訊注冊和通知,另外一個是K-V存儲。
namesrv啟動
if [ -z "$ROCKETMQ_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
ROCKETMQ_HOME=`dirname "$PRG"`/..
# make it fully qualified
ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`
cd "$saveddir"
fi
export ROCKETMQ_HOME
sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@
執行ruunserver.sh 擷取java環境相關資訊和JVM配置 最後啟動 org.apache.rocketmq.namesrv.NamesrvStartup 的main方法
error_exit ()
{
echo "ERROR: $1 !!"
exit 1
}
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
#===========================================================================================
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
$JAVA ${JAVA_OPT} $@
NamesrvStartup 使用apache的CommandLine來擷取指令行中的參數初始化namesrv的配置。
其中 -c 表示namesrvConfig和nettyServerConfig的存儲讀寫路徑 -p表示是否列印配置 ,并初始化日志,調用controller的init和start
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 設定JVM關閉鈎子
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf(tip + "%n");
namesrv初始化
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
// 網絡事件監聽 當有broker斷開,會自動更新本地路由表
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
// 存放namesrvConfig
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
public boolean initialize() {
// 加載Kvconfig.jso到記憶體
this.kvConfigManager.load();
// 初始化remotingServer
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 處理業務線程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注冊處理器
this.registerProcessor();
// 檢測無效的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 列印 k-v配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
return true;
}
然後調用strart方法啟動
this.remotingServer.start();
namesrv實作
主要功能
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG: // 存KV
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG: //取KV
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG: // 删除KV
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER: // 注冊broker
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER: // 登出broker
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC: // 根據topic擷取路由表
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO: //擷取叢集中broker資訊
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER: // 删除broker的寫權限
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: // 擷取所有的topic資訊
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV: // 删除某個topic資訊
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE: // 擷取namespace下的所有KV
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER: // 擷取叢集中的topic
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: // 擷取系統topic
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST: // 擷取 unit topic
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: // 擷取 unit SUB topic ,比如 %RETRY% TOPIC
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: //
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG: // 更新namesrv的配置
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG: // 擷取namesrv的配置
return this.getConfig(ctx, request);
default:
break;
}
return null;
路由表資料結構
// 一個topic 與多個broker的對應關系
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// 一個brokerName 與 brokername資訊的對應關系 包含主從broker位址資訊 一對主從節點使用同一個brokername
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 定義了一個叢集包含哪些brokerName
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 一個broker的主機位址 存儲了網絡channel 版本 以及HA的服務位址
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 定義了一個broker主機對應的過濾伺服器組
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
接下來看一下幾個關鍵操作
注冊broker
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
// 定義響應對象
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
} else {
// 沒有body的時候 給registerBrokerBody的DataVersion和Timestamp 初始化
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
}
// 主要是注冊記憶體中
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
// 傳回KV配置
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
當broker挂了 在事件監聽器中可以監聽到網絡的變化,并更新記憶體
public class BrokerHousekeepingService implements ChannelEventListener {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
// 如果為了實作實時通知到product或者sonsumer 可以在這邊緩存channel
}
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}
擷取路由資訊
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
this.lock.readLock().lockInterruptibly();
// 根據topic擷取一組brokername
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
// 每一個brokerName都有對個broker主機 包含主從節點,每一個broker節點,都對應一組過濾伺服器節點
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
if (log.isDebugEnabled()) {
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
}
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;