天天看點

RocketMQ源碼分析之訂閱消費組管理元件SubscriptionGroupManager

#頭條創作挑戰賽#

一、前言

前置知識點:RocketMQ源碼分析之配置檔案管理抽象類ConfigManager

我們之前分析了生産者組和生産者資料管理:RocketMQ源碼分析之生産者組和生産者的關系,資料怎麼維護?

這一篇我們就來分析一下訂閱分組資料是存儲在那裡的,如何進行建立和删除。

二、源碼分析

  1. 訂閱分組資料結構分析;
  2. 構造方法,其中内部完成了系統topic訂閱組的資料初始化;
  3. 更新訂閱組配置;
  4. 禁用消費;
  5. 根據GroupName查找訂閱組資訊;
  6. 訂閱分組資料編碼和解碼;
  7. 根據組名删除配置資訊;
  8. 擷取訂閱消費組配置;

1、訂閱分組資料結構分析

SubscriptionGroupManager繼承至ConfigManager,我們可以猜想出子類肯定會實作configFilePath抽象方法,來告訴父類持久化位址路徑;

我們先看一下SubscriptionGroupConfig類的資料結構,再去看一下磁盤中是如何存儲的;

// 用來進行訂閱消費組的管理的
public class SubscriptionGroupManager extends ConfigManager {

    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

    // 消費組名稱->訂閱消費組配置
    private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
        new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
    // 資料版本号
    private final DataVersion dataVersion = new DataVersion();
    private transient BrokerController brokerController;

		@Override
    public String configFilePath() {
        return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig()
            .getStorePathRootDir());
}           
public class SubscriptionGroupConfig {

    // 消費組名稱
    private String groupName;
    // 是否啟用消費
    private boolean consumeEnable = true;
    // 是否啟用從最小偏移量開始消費
    private boolean consumeFromMinEnable = true;
    // 是否啟用消費廣播
    private boolean consumeBroadcastEnable = true;
    // 重試隊列數量
    private int retryQueueNums = 1;
    // 重試最大次數
    private int retryMaxTimes = 16;
    // masterid
    private long brokerId = MixAll.MASTER_ID;
    // 慢消費的時候選用哪個broker
    private long whichBrokerWhenConsumeSlowly = 1;
    // 是否啟用通知消費者ids變化
    private boolean notifyConsumerIdsChangedEnable = true;
}           

我們直接找到subscriptionGroup.json看一下裡面的資料結構是怎樣的

public static String getSubscriptionGroupPath(final String rootDir) {
    return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
}           

給大家稍微截一下,了解一下大緻結構

RocketMQ源碼分析之訂閱消費組管理元件SubscriptionGroupManager

2、構造方法

SubscriptionGroupManager的構造方法中直接調用init方法,初始化了一系列的訂閱分組資料初始化,并且放入subscriptionGroupTable中;

public SubscriptionGroupManager(BrokerController brokerController) {
    this.brokerController = brokerController;
    this.init();
}           
private void init() {
    // 這個裡面他會初始化一系類的針對系統topic的消費者
    // 是針對tools這個系統topic的消費組
    {
        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
        subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
        this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
    }
    // filterserver消費組
    {
        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
        subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
        this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
    }
    // selftest消費組
    {
        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
        subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
        this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
    }
    // onshttp_proxy消費組
    {
        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
        subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
        subscriptionGroupConfig.setConsumeBroadcastEnable(true);
        this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
    }
    // cid onsapi pull消費組
    {
        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
        subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
        subscriptionGroupConfig.setConsumeBroadcastEnable(true);
        this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
    }
    // dcid onsapi permission消費組
    {
        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
        subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
        subscriptionGroupConfig.setConsumeBroadcastEnable(true);
        this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
    }
    // cid onsapi owner消費組
    {
        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
        subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
        subscriptionGroupConfig.setConsumeBroadcastEnable(true);
        this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
    }
}           

3、更新訂閱組配置

// 更新訂閱組配置
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
    // 直接put,沒有新增,存在則覆寫
    SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
    if (old != null) {
        log.info("update subscription group config, old: {} new: {}", old, config);
    } else {
        log.info("create new subscription group, {}", config);
    }

    // 版本号遞增
    this.dataVersion.nextVersion();

    // 持久化
    this.persist();
}           

4、禁用消費

// 禁用消費
public void disableConsume(final String groupName) {
    // 根據分組名稱擷取資料
    SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
    if (old != null) {
        // 存在則修改是否啟用消費标志位
        old.setConsumeEnable(false);
        // 遞增版本号
        this.dataVersion.nextVersion();
    }
}           

5、根據GroupName查找訂閱組資訊

大緻步驟如下:

  1. 以GroupNmae從SubscriptionGroupManager.SubscriptionGroupTable變量中擷取訂閱資訊SubscriptionGroupConfig對象;若不為空則直接傳回該對象;
  2. 若為空,表示還沒有該訂閱資訊,再檢查Broker是否運作自動建立訂閱組(由配置參數autoCreateSubscriptionGroup設定,預設為true)。
  3. 若不允許則直接傳回null;
  4. 若允許則首先以GroupName為參數建立SubscriptionGroupConfig對象,并存入SubscriptionGroupTable變量中;然後更新SubscriptionGroupManager的dataversion值;
  5. 再将SubscriptionGroupManager的内容持久化到subscriptionGroup.json檔案中;
  6. 最後傳回該新建立的SubscriptionGroupConfig對象;
// 根據GroupName查找訂閱組資訊
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
    // 擷取訂閱資訊SubscriptionGroupConfig對象;若不為空則直接傳回該對象;
    SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
    if (null == subscriptionGroupConfig) {
        // 檢查Broker是否運作自動建立訂閱組
        if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
            // 建構SubscriptionGroupConfig對象
            subscriptionGroupConfig = new SubscriptionGroupConfig();
            subscriptionGroupConfig.setGroupName(group);
            SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
            if (null == preConfig) {
                log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
            }
            // 遞增版本号
            this.dataVersion.nextVersion();
            // 序列化
            this.persist();
        }
    }

    return subscriptionGroupConfig;
}           

6、訂閱分組資料編碼和解碼

其實就是通過RemotingSerializable工具類進行json序列化和反序列化;

@Override
public void decode(String jsonString) {
    if (jsonString != null) {
        SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
        if (obj != null) {
            this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
            this.dataVersion.assignNewOne(obj.dataVersion);
            this.printLoadDataWhenFirstBoot(obj);
        }
    }
}

public String encode(final boolean prettyFormat) {
    return RemotingSerializable.toJson(this, prettyFormat);
}

private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
    Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, SubscriptionGroupConfig> next = it.next();
        log.info("load exist subscription group, {}", next.getValue().toString());
    }
}           

7、根據組名删除配置資訊

// 根據組名删除配置資訊
public void deleteSubscriptionGroupConfig(final String groupName) {
    SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
    if (old != null) {
        log.info("delete subscription group OK, subscription group:{}", old);
        this.dataVersion.nextVersion();
        this.persist();
    } else {
        log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
    }
}           

8、擷取訂閱消費組配置

public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
    return subscriptionGroupTable;
}           

三、總結

SubscriptionGroupConfig可以了解為消費分組和broker之間的契約,如果broker不存在消費分組的訂閱關系,那麼該消費分組就無法從該broker消費任何消息。