天天看點

RocketMQ實戰(四)前言RocketMQ 3.2.6的事務機制Pull Or PushRocketMQ Filter元件介紹

這将是RocketMQ實戰系列的最後一篇文章,該系列的文章清單如下:

<a href="http://zhangfengzhe.blog.51cto.com/8855103/1914354" target="_blank">《RocketMQ實戰(一)》</a>

<a href="http://zhangfengzhe.blog.51cto.com/8855103/1916811" target="_blank">《RocketMQ實戰(二)》</a>

<a href="http://zhangfengzhe.blog.51cto.com/8855103/1919885" target="_blank">《RocketMQ實戰(三):分布式事務》</a>

在上一篇部落格中,已經知道RocketMQ 3.0.8是支援事務回查機制,但是在RocketMQ 3.2.6中取消了這個功能,下面我們繼續以轉賬功能分析我們自己如何解決這個問題。

<a href="https://s2.51cto.com/wyfs02/M01/93/4B/wKiom1kJ1cPRn6RcAABWBYYjcKA754.jpg" target="_blank"></a>

在正常情況下,當然沒有問題,如果第五步(向MQ發送确認消息)出現失敗,加上RocketMQ 3.2.6版本沒有事務回查機制,就會導緻這條轉賬消息,在A銀行完成了操作,但是遲遲對B銀行系統不可見!

<a href="https://s5.51cto.com/wyfs02/M00/93/4B/wKiom1kJ1gSAXzelAAB6Q70jL3M558.png" target="_blank"></a>

使用者U1從A銀行系統轉賬給B銀行系統的使用者U2的處理過程如下:

第一步:A銀行系統生成一條轉賬消息,以事務消息的方式寫入RocketMQ,此時B銀行系統不可見這條消息

第二步:寫入MQ成功後,回調A銀行系統,對T1,T2表進行操作(很顯然需要是一個事務)

我們重點關注下T2表,這個表是用來幹嘛的呢?每條轉賬消息都會在T2表中,該表有2個特殊的字段:status,updatetime。(用途會在後文詳述)

第三步:完成第二步,接下來發送确認消息給MQ,如果這個确認消息發送成功,那麼這條轉賬消息,将對B銀行系統可見。然後B銀行系統,會在一個事務中完成對t3,t5的操作。

如果發送确認消息給MQ失敗的處理思路:

首先,B銀行系統,有一個定時任務(比如說每隔1MIN執行一次),掃描表t5,取得一段時間内的資料,發送給A銀行系統。要知道t5中的資料,必然是A銀行系統成功處理并發送确認消息成功的轉賬資料。為什麼要發送給A銀行系統呢,其實就是為了找到那些發送确認消息失敗的轉賬資料。那麼怎麼發給A銀行系統呢,這個方式比較多,可以考慮在來一個Topic,也可以考慮Netty等。發送給A銀行系統,其實就是為了更新t2表的status,updatetime。

這裡有一個關鍵,如何“掃描表t5,取得一段時間内的資料”?這就是t4的作用,在t4中記錄一個time字段,每次定時任務啟動,先更新time(比如設定為目前系統時間,設定前的的時間為old),然後掃描出t5中大于這個old時間的轉賬資料,如此循環往複。

其次,A銀行系統,也有一個定時任務(可以根據業務消費能力定,可以大一些),掃描t2表(指定status及updatetime條件),将那些确認消息發送失敗的轉賬消息找出來,更新updatetime并發送給MQ。

這樣,我們并沒有改動RocketMQ 3.2.6的源碼,而是在外圍解決了事務回查!

其實到這裡,你可以發現RocketMQ的一個特點,就是将生産者和MQ綁定,而不需要特别處理消費者,這是為什麼呢?因為消息隻要發往RocketMQ成功,那麼就意味着成功,為什麼這麼說?

前面,我們說過,消費者端消費消息隻會産生2種錯誤,第一:timeout,第二:exception。要知道RocketMQ對于逾時,會不斷重試;對于消費異常,會根據消費端的傳回碼,會有重試機制保證。也就是,RocketMQ一定會讓消息得到消費,如果消費有問題,隻能是消費者的問題,而不會是RocketMQ的問題!

在前面的部落格已經提到,在RocketMQ中Consumer分為2類:Push Consumer、Pull Consumer。以前的例子都是Push Consumer,接下來,為大家介紹下Pull Consumer。

<a href="https://s4.51cto.com/wyfs02/M02/93/4A/wKioL1kJ1l-Q8Be4AAAhA9Onb10883.png" target="_blank"></a>

<a href="https://s1.51cto.com/wyfs02/M00/93/4B/wKiom1kJ1nuz0T0dAADIcid4cW4589.png" target="_blank"></a>

從表面意思上來看,好像Push是MQ推送給消費者,而Pull是消費者從MQ中拉取;其實本質上都是拉取模式PULL,即消費者從MQ中輪詢取得消息。

在Push模式下,Consumer把輪詢過程封裝了,并注冊了MessageListener監聽器,取到消息後,喚醒MessageListener監聽器中的consumeMessage()進行消費,是以給我們造成了感覺上好像是“推消息”。

在Pull模式下,需要特别注意的是,本質上是從一個Topic下的所有Queue進行拉取,而且每個Queue都必須記錄拉取位置,否則會導緻重複消費。還有拉取的時間間隔,拉取的大小等等。不過所有的這一切,MQPullConsumerScheduleService都替我們考慮清楚了,提供updateConsumeOffset去更新消費的隊列的位置(預設5S同步一次),提供setPullNextDelayTimeMillis設定下次拉取的時間間隔(應該設定的大一些,至少大于5S)。

仔細回想下,對于Push方式的回調   和  Pull方式的回調,還有什麼關鍵差別麼?

對于Push而言,不論是基于MessageListenerConcurrently的,還是基于MessageListenerOrderly的,都有傳回值的;而Pull的doPullTask的傳回值卻是void?

這意味,我們需要在pull方式中,注意自己處理每條消息消費的異常情況!

<a href="https://s5.51cto.com/wyfs02/M00/93/4A/wKioL1kJ1qjCh3XgAABfya_2KQ0041.png" target="_blank"></a>

通過運作結果,可以印證上面的觀點:為什麼每次消費都是4條開始,4條結束呢?因為一個Topic下有4個Queue,而且上面的代碼實際上會針對每個Queue開啟一個線程去消費!

對于ActiveMQ而言,我們可以通過JMS Selectors機制(就是類似于SQL的文法)來實作過濾,很easy。那麼和RocketMQ Filter元件有什麼差別呢?

雖然,2者都能實作過濾,但是RocketMQ Filter的性能要更高效些,因為RocketMQ是在broker上将過濾後的資料發往filter,然後消費者直接從filter上取得資料;而ActiveMQ是消費者直接在broker上進行過濾消費!(當然,對于RocketMQ而言,Tag機制已經足夠應付日常絕大數的過濾功能,除非你的業務對性能有特别高的要求)

<a href="https://s1.51cto.com/wyfs02/M00/93/4A/wKioL1kJ1uaBn0l2AABatuiY9e8836.png" target="_blank"></a>

具體怎麼做呢?這裡我就不示範了,網上有很多例子,這裡隻說下大緻的過程:

第一:broker-xxx.properties中指定filter個數 

第二:上傳一段JAVA代碼,其實就是一個類

本文轉自zfz_linux_boy 51CTO部落格,原文連結:http://blog.51cto.com/zhangfengzhe/1921757,如需轉載請自行聯系原作者