天天看點

詳解RocketMQ不同類型的消費者

根據使用者對讀取操作的控制情況,分為兩種類型。一個是DefaultMQPushConsumer,由系統控制讀取操作,收到消息後自動調用傳入的處理方法來處理;另一個是DefaultMQPullConsumer,讀取操作中的大部分功能由使用者自主要制。

1.DefaultMQPushConsumer的使用

使用DefaultMQPushConsumer主要是設定好各種參數和傳入處理消息的函數。系統收到消息後自動調用處理函數來處理消息,自動儲存Offset,而且加入新的DefaultMQPushConsumer後會自動做負載均衡。下面結合org.apache.rocketmq.example.quickstart包中的源碼來介紹。

代碼清單1-1 DefaultMQPushConsumer示例

public class QuickStart {

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");           

Consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");

Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

Consumer.setMessageModel(MessageModel.BROADCASTING);

    Consumer.subscribe("TopicTest", "*");
    Consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,   ConsumeConcurrentlyContext context) {
            System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    Consumer.start();
}           

}

DefaultMQPushConsumer需要設定三個參數:一是這個Consumer的GroupName,二是NameServer的位址和端口号,三是Topic的名稱,下面詳細介紹。

Consumer的GroupName用于把多個Consumer組織到一起,提高并發處理能力,GroupName需要和消息模式(MessageModel)配合使用。

RocketMQ支援兩種消息模式:Clustering 和 Broadcasting。

 在Clustering 模式下,同一個ConsumerGroup(GroupName相同)裡的每個Consumer隻消費所訂閱消息的一部分内容,同一個ConsumerGroup裡所有的Consumer消費的内容合起來才是所訂閱Topic内容的整體,進而達到負載均衡的目的。

 在Broadcasting模式下,同一個ConsumerGroup裡的每個Consumer都能消費到所訂閱Topic的全部消息,也就是一個消息會被多次分發,被多個Consumer消費。

NameServer的位址和端口号,可以填寫多個,用分号隔開,達到消除單點故障的目的,比如 “ip1:port;ip2:port;ip3:port”。

Topic名稱用來辨別消息類型,需要提前建立。如果不需要消費某個Topic下的所有消息,可以通過指定消息的Tag進行消息過濾,比如:Consumer.subscribe("TopicTest", "tag1 || tag2 || tag3"),表示這個Consumer要消費“TopicTest”下帶有tag1或tag2或tag3的消息(Tag是在發送消息時設定的标簽)。在填寫Tag參數的位置,用null或者“*”表示要消費這個Topic的所有消息。

2.DefaultMQPushConsumer的處理流程

本節通過分析源碼來說明DefaultMQPushConsumer的處理流程。

DefaultMQPushConsumer主要功能實作在DefaultMQPushConsumerImpl類中,消息的處理邏輯是在pullMessage這個函數裡的PullCallBack中。在PullCallBack函數裡有個switch語句,根據從Broker傳回的消息類型做相應的處理,具體處理邏輯可以檢視源碼。

代碼清單1-2 DefaultMQPushConsuer的處理邏輯

switch (pullResult.getPullStatus()) {

case FOUND:
    …..
    break;
case NO_NEW_MSG:
    ……
    break;
case OFFSET_ILLEGAL:
    ……
    break;
default:
    break;           

DefaultMQPushConsuer的源碼中有很多PullRequest語句,比如DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest),為什麼“PushConsumer”中使用“PullRequest”呢?這是通過“長輪詢”方式達到Push效果的方法,長輪詢方式既有Pull的優點,又兼具Push方式的實時性。

Push方式是Server端接收到消息後,主動把消息推送給Client端,實時性高。對于一個提供隊列服務的Server來說,用Push方式主動推送有很多弊端;首先是加大Server端的工作量,進而影響Server的性能,其次Client的處理能力各不相同,Client的狀态不受Server控制,如果Client不能及時處理Server推送過來的消息,會造成各種潛在問題。

Pull方式是Client端循環地從Server端拉取消息,主動權在Client手裡,自己拉取到一定量消息後,處理妥當了再接着取。Pull方式的問題是循環拉取消息的間隔不好設定,間隔太短就處在一個“忙等”的狀态,浪費資源;每個Pull的時間間隔太長,Server端有消息到來有可能沒有被及時處理。

“長輪詢”方式是通過Client端和Server端的配合,既擁有Pull的優點,又能達到保證明時性的目的。我們結合源碼來分析:

代碼清單1-3 發送Pull消息代碼片段

PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();

requestHeader.setConsumerGroup(this.ConsumerGroup);

requestHeader.setTopic(mq.getTopic());

requestHeader.setQueueId(mq.getQueueId());

requestHeader.setQueueOffset(Offset);

requestHeader.setMaxMsgNums(maxNums);

requestHeader.setSysFlag(sysFlagInner);

requestHeader.setCommitOffset(commitOffset);

requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);

requestHeader.setSubscription(subExpression);

requestHeader.setSubVersion(subVersion);

requestHeader.setExpressionType(expressionType);

PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(

brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);
           

源碼中有這一行設定語句requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis),設定Broker最長阻塞時間,預設設定是15秒,注意是Broker在沒有新消息的時候才阻塞,有消息會立刻傳回。

從Broker的源碼中可以看出,服務端接到新消息請求後,如果隊列裡沒有新消息,并不急于傳回,通過一個循環不斷檢視狀态,每次 waitForRunning一段時候(預設是5秒),然後後再Check。預設情況下當Broker一直沒有新消息,第三次Check的時候,等待時間超過Request裡面的 BrokerSuspendMaxTimeMillis,就傳回空結果。在等待的過程中,Broker收到了新的消息後會直接調用notifyMessageArriving函數傳回請求結果。“長輪詢”的核心是,Broker端HOLD住用戶端過來的請求一小段時間,在這個時間内有新消息到達,就利用現有的連接配接立刻傳回消息給Consumer。“長輪詢”的主動權還是掌握在Consumer手中,Broker即使有大量消息積壓,也不會主動推送給Consumer。

長輪詢方式的局限性,是在HOLD住Consumer請求的時候需要占用資源,它适合用在消息隊列這種用戶端連接配接數可控的場景中。

3.DefaultMQPullConsumer

使用DefaultMQPullConsumer像使用DefaultMQPushConsumer一樣需要設定各種參數,寫處理消息的函數,同時還需要做額外的事情。接下來結合org.apache.rocketmq.example.simple包中的例子源碼來介紹。

示例代碼的處理邏輯是逐個讀取某Topic下所有Message Queue的内容,讀完一遍後退出,主要處理額外的三件事情:

(1) 擷取Message Queue并周遊

一個Topic包括多個Message Queue,如果這個Consumer需要擷取Topic下所有的消息,就要周遊多有的Message Queue。如果有特殊情況,也可以選擇某些特定的Message Queue來讀取消息。

(2) 維護Offsetstore

從一個Message Queue裡拉取消息的時候,要傳入Offset參數(long類型的值),随着不斷讀取消息,Offset會不斷增長。這個時候由使用者負責把Offset存儲下來,根據具體情況可以存到記憶體裡、寫到磁盤或者資料庫裡等。

(3) 根據不同的消息狀态做不同的處理

拉取消息的請求發出後,會傳回:FOUND,NO_MATCHED_MSG,NO_NEW_MSG,OFFSET_ILLEGAL四種狀态,要根據每個狀态做不同的處理。比較重要的兩個狀态是FOUNT和NO_NEW_MSG,分别表示擷取到消息和沒有新的消息

實際情況中可以把while(true)放到外層,達到無限循環的目的。因為PullConsumer需要使用者自己處理周遊Message Queue、儲存Offset,是以PullConsumer有更多的自主性和靈活性。

推薦閱讀:

詳解RocketMQ不同類型的消費者

RocketMQ實戰與原了解析

作者:楊開元

定價:59.00元

•RocketMQ由阿裡開源,Apache開源項目,經受多年流量峰值考驗,在多個性能名額上遠超同類産品

•作者是阿裡資深資料專家,有多年RocketMQ使用經驗,深入研究RocketMQ源代碼,寫作前與RocketMQ官方團隊有深入溝通

•雲栖社群官方出品,得到RocketMQ官方研發團隊以及業界的多位專家的肯定和推薦

閱讀原文:

http://product.dangdang.com/25290633.html