#頭條創作挑戰賽#
一、前言
前置知識點:RocketMQ源碼分析之配置檔案管理抽象類ConfigManager
我們之前分析了生産者組和生産者資料管理:RocketMQ源碼分析之生産者組和生産者的關系,資料怎麼維護?
這一篇我們就來分析一下訂閱分組資料是存儲在那裡的,如何進行建立和删除。
二、源碼分析
- 訂閱分組資料結構分析;
- 構造方法,其中内部完成了系統topic訂閱組的資料初始化;
- 更新訂閱組配置;
- 禁用消費;
- 根據GroupName查找訂閱組資訊;
- 訂閱分組資料編碼和解碼;
- 根據組名删除配置資訊;
- 擷取訂閱消費組配置;
1、訂閱分組資料結構分析
SubscriptionGroupManager繼承至ConfigManager,我們可以猜想出子類肯定會實作configFilePath抽象方法,來告訴父類持久化位址路徑;
我們先看一下SubscriptionGroupConfig類的資料結構,再去看一下磁盤中是如何存儲的;
// 用來進行訂閱消費組的管理的
public class SubscriptionGroupManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
// 消費組名稱->訂閱消費組配置
private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
// 資料版本号
private final DataVersion dataVersion = new DataVersion();
private transient BrokerController brokerController;
@Override
public String configFilePath() {
return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig()
.getStorePathRootDir());
}
public class SubscriptionGroupConfig {
// 消費組名稱
private String groupName;
// 是否啟用消費
private boolean consumeEnable = true;
// 是否啟用從最小偏移量開始消費
private boolean consumeFromMinEnable = true;
// 是否啟用消費廣播
private boolean consumeBroadcastEnable = true;
// 重試隊列數量
private int retryQueueNums = 1;
// 重試最大次數
private int retryMaxTimes = 16;
// masterid
private long brokerId = MixAll.MASTER_ID;
// 慢消費的時候選用哪個broker
private long whichBrokerWhenConsumeSlowly = 1;
// 是否啟用通知消費者ids變化
private boolean notifyConsumerIdsChangedEnable = true;
}
我們直接找到subscriptionGroup.json看一下裡面的資料結構是怎樣的
public static String getSubscriptionGroupPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
}
給大家稍微截一下,了解一下大緻結構
2、構造方法
SubscriptionGroupManager的構造方法中直接調用init方法,初始化了一系列的訂閱分組資料初始化,并且放入subscriptionGroupTable中;
public SubscriptionGroupManager(BrokerController brokerController) {
this.brokerController = brokerController;
this.init();
}
private void init() {
// 這個裡面他會初始化一系類的針對系統topic的消費者
// 是針對tools這個系統topic的消費組
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
}
// filterserver消費組
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
}
// selftest消費組
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
}
// onshttp_proxy消費組
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
}
// cid onsapi pull消費組
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
}
// dcid onsapi permission消費組
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
}
// cid onsapi owner消費組
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
}
}
3、更新訂閱組配置
// 更新訂閱組配置
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
// 直接put,沒有新增,存在則覆寫
SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
if (old != null) {
log.info("update subscription group config, old: {} new: {}", old, config);
} else {
log.info("create new subscription group, {}", config);
}
// 版本号遞增
this.dataVersion.nextVersion();
// 持久化
this.persist();
}
4、禁用消費
// 禁用消費
public void disableConsume(final String groupName) {
// 根據分組名稱擷取資料
SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
if (old != null) {
// 存在則修改是否啟用消費标志位
old.setConsumeEnable(false);
// 遞增版本号
this.dataVersion.nextVersion();
}
}
5、根據GroupName查找訂閱組資訊
大緻步驟如下:
- 以GroupNmae從SubscriptionGroupManager.SubscriptionGroupTable變量中擷取訂閱資訊SubscriptionGroupConfig對象;若不為空則直接傳回該對象;
- 若為空,表示還沒有該訂閱資訊,再檢查Broker是否運作自動建立訂閱組(由配置參數autoCreateSubscriptionGroup設定,預設為true)。
- 若不允許則直接傳回null;
- 若允許則首先以GroupName為參數建立SubscriptionGroupConfig對象,并存入SubscriptionGroupTable變量中;然後更新SubscriptionGroupManager的dataversion值;
- 再将SubscriptionGroupManager的内容持久化到subscriptionGroup.json檔案中;
- 最後傳回該新建立的SubscriptionGroupConfig對象;
// 根據GroupName查找訂閱組資訊
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
// 擷取訂閱資訊SubscriptionGroupConfig對象;若不為空則直接傳回該對象;
SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
if (null == subscriptionGroupConfig) {
// 檢查Broker是否運作自動建立訂閱組
if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
// 建構SubscriptionGroupConfig對象
subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(group);
SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
if (null == preConfig) {
log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
}
// 遞增版本号
this.dataVersion.nextVersion();
// 序列化
this.persist();
}
}
return subscriptionGroupConfig;
}
6、訂閱分組資料編碼和解碼
其實就是通過RemotingSerializable工具類進行json序列化和反序列化;
@Override
public void decode(String jsonString) {
if (jsonString != null) {
SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
if (obj != null) {
this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
this.dataVersion.assignNewOne(obj.dataVersion);
this.printLoadDataWhenFirstBoot(obj);
}
}
}
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
}
private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionGroupConfig> next = it.next();
log.info("load exist subscription group, {}", next.getValue().toString());
}
}
7、根據組名删除配置資訊
// 根據組名删除配置資訊
public void deleteSubscriptionGroupConfig(final String groupName) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
if (old != null) {
log.info("delete subscription group OK, subscription group:{}", old);
this.dataVersion.nextVersion();
this.persist();
} else {
log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
}
}
8、擷取訂閱消費組配置
public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
return subscriptionGroupTable;
}
三、總結
SubscriptionGroupConfig可以了解為消費分組和broker之間的契約,如果broker不存在消費分組的訂閱關系,那麼該消費分組就無法從該broker消費任何消息。