天天看點

RocketMQ分析:PushConsumer流量控制

    本文分析PushConsumer的流量控制方法。PushConsumer使用Pull方式擷取消息,好處是用戶端能夠根據自身的處理速度調整擷取消息的操作速度。PushConsumer的流量控制采用多線程處理方式。

    RocketMQ的版本為:4.2.0 release。

一.PushConsumer使用線程池,每個線程同時執行對應的消息處理邏輯

     線程池的定義在 PushConsumer 啟動的時候,初始化consumeMessageService的時候,在構造方法裡面建立的。

    DefaultMQPushConsumer#start

public void start() throws MQClientException {
    this.defaultMQPushConsumerImpl.start();
}           

    DefaultMQPushConsumerImpl#start

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}           

    ConsumeMessageOrderlyService#ConsumeMessageOrderlyService    構造方法 :

this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),// 線程池初始化時線程數量
    this.defaultMQPushConsumer.getConsumeThreadMax(),// 線程池最大線程數
    1000 * 60,
    TimeUnit.MILLISECONDS,// 線程保持活着的空閑時間,60秒
    this.consumeRequestQueue,// 排隊等待線程隊列
    new ThreadFactoryImpl("ConsumeMessageThread_")
);           

二.使用ProcessQueue保持Message Queue消息處理狀态的快照

    在pullMessage開始的時候,從pullRequest中擷取ProcessQueue。

DefaultMQPushConsumerImpl#pullMessage
final ProcessQueue processQueue = pullRequest.getProcessQueue();// 從pullRequest中擷取ProcessQueue           

    拿到ProcessQueue對象之後,用戶端在每次Pull請求之前會做下面三個判斷來控制流量:消息個數、消息總大小以及Offset的跨度,任何一個值超過設定的大小就隔一段時間(預設50毫秒)再拉取消息,由此來達到流量控制的目的。

long cachedMessageCount = processQueue.getMsgCount().get();// 消息個數
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);// 消息總大小(機關M)
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {// 預設最大1000個
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);// 延遲50毫秒執行
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB,                                 pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount,                             cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {// 預設最大100M
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);// 延遲50毫秒執行
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount,                         cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
if (!this.consumeOrderly) {
    if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {// Offset的跨度
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);// 延遲50毫秒執行
        if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
            log.warn(
                "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                pullRequest, queueMaxSpanFlowControlTimes);
        }
    return;
    }
} else {
    if (processQueue.isLocked()) {
        if (!pullRequest.isLockedFirst()) {
        final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
        boolean brokerBusy = offset < pullRequest.getNextOffset();
        log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
            pullRequest, offset, brokerBusy);
        if (brokerBusy) {
            log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                pullRequest, offset);
        }
        pullRequest.setLockedFirst(true);
        pullRequest.setNextOffset(offset);
    }
} else {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    log.info("pull message later because not locked in broker, {}", pullRequest);
    return;
}
}           

    DefaultMQPushConsumerImpl#executePullRequestLater 延遲執行

this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
//PullMessageService#executePullRequestLater
if (!isStopped()) {
    this.scheduledExecutorService.schedule(new Runnable() {
        @Override
        public void run() {
            PullMessageService.this.executePullRequestImmediately(pullRequest);
        }
    }, timeDelay, TimeUnit.MILLISECONDS);// 延遲50毫秒執行
} else {
    log.warn("PullMessageServiceScheduledThread has shutdown");
}           

三.ProcessQueue的結構

    ProcessQueue中主要是一個TreeMap和一個讀寫鎖。TreeMap裡以MessageQueue的Offset作為Key,以消息内容的引用為Value,儲存所有從MessageQueue擷取到,但是還未被處理的消息;讀寫鎖的作用是控制多線程下對TreeMap對象的并發通路。

private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();// 保護TreeMap的讀寫鎖
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();           

    使用讀寫鎖作并發控制:

#ProcessQueue#putMessage 寫鎖
boolean dispatchToConsume = false;
try {
    this.lockTreeMap.writeLock().lockInterruptibly();// 寫加鎖
    try {
         ......
        msgCount.addAndGet(validMsgCnt);
        ......
    } finally {
        this.lockTreeMap.writeLock().unlock();// 寫解鎖
    }
} catch (InterruptedException e) {
    log.error("putMessage exception", e);
}
return dispatchToConsume;
#ProcessQueue#getMaxSpan 讀鎖
try {
    this.lockTreeMap.readLock().lockInterruptibly();// 讀加鎖
    try {
        if (!this.msgTreeMap.isEmpty()) {
            return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
        }
    } finally {
        this.lockTreeMap.readLock().unlock();// 讀解鎖
    }
} catch (InterruptedException e) {
    log.error("getMaxSpan exception", e);
}
return 0;