整體閱讀時間,在 40 分鐘左右。
常見的消息隊列很多,主要包括 RabbitMQ、Kafka、RocketMQ 和 ActiveMQ,這篇文章隻講 RabbitMQ,先講原理,後搞實戰。
文章很長,如果你能一次性看完,“大哥,請收下我的膝蓋”,建議大家先收藏,啥時需要面試,或者工作中遇到了,可以再慢慢看。
不 BB,直接上思維導圖:
1. 消息隊列
1.1 消息隊列模式
消息隊列目前主要 2 種模式,分别為“點對點模式”和“釋出/訂閱模式”。
1.1.1 點對點模式
一個具體的消息隻能由一個消費者消費,多個生産者可以向同一個消息隊列發送消息,但是一個消息在被一個消息者處理的時候,這個消息在隊列上會被鎖住或者被移除并且其他消費者無法處理該消息。
需要額外注意的是,如果消費者處理一個消息失敗了,消息系統一般會把這個消息放回隊列,這樣其他消費者可以繼續處理。
1.1.2 釋出/訂閱模式
單個消息可以被多個訂閱者并發的擷取和處理。一般來說,訂閱有兩種類型:
- 臨時(ephemeral)訂閱:這種訂閱隻有在消費者啟動并且運作的時候才存在。一旦消費者退出,相應的訂閱以及尚未處理的消息就會丢失。
- 持久(durable)訂閱:這種訂閱會一直存在,除非主動去删除。消費者退出後,消息系統會繼續維護該訂閱,并且後續消息可以被繼續處理。
1.2 衡量标準
對消息隊列進行技術選型時,需要通過以下名額衡量你所選擇的消息隊列,是否可以滿足你的需求:
- 消息順序:發送到隊列的消息,消費時是否可以保證消費的順序,比如A先下單,B後下單,應該是A先去扣庫存,B再去扣,順序不能反。
- 消息路由:根據路由規則,隻訂閱比對路由規則的消息,比如有A/B兩者規則的消息,消費者可以隻訂閱A消息,B消息不會消費。
- 消息可靠性:是否會存在丢消息的情況,比如有A/B兩個消息,最後隻有B消息能消費,A消息丢失。
- 消息時序:主要包括“消息存活時間”和“延遲/預定的消息”,“消息存活時間”表示生産者可以對消息設定TTL,如果超過該TTL,消息會自動消失;“延遲/預定的消息”指的是可以延遲或者預訂消費消息,比如延時5分鐘,那麼消息會5分鐘後才能讓消費者消費,時間未到的話,是不能消費的。
- 消息留存:消息消費成功後,是否還會繼續保留在消息隊列。
- 容錯性:當一條消息消費失敗後,是否有一些機制,保證這條消息是一種能成功,比如異步第三方退款消息,需要保證這條消息消費掉,才能确定給使用者退款成功,是以必須保證這條消息消費成功的準确性。
- 伸縮:當消息隊列性能有問題,比如消費太慢,是否可以快速支援庫容;當消費隊列過多,浪費系統資源,是否可以支援縮容。
- 吞吐量:支援的最高并發數。
2. RabbitMQ 原理初探
RabbitMQ 2007 年釋出,是使用 Erlang 語言開發的開源消息隊列系統,基于 AMQP 協定來實作。
2.1 基本概念
提到RabbitMQ,就不得不提AMQP協定。AMQP協定是具有現代特征的二進制協定。是一個提供統一消息服務的應用層标準進階消息隊列協定,是應用層協定的一個開放标準,為面向消息的中間件設計。
先了解一下AMQP協定中間的幾個重要概念:
- Server:接收用戶端的連接配接,實作AMQP實體服務。
- Connection:連接配接,應用程式與Server的網絡連接配接,TCP連接配接。
- Channel:信道,消息讀寫等操作在信道中進行。用戶端可以建立多個信道,每個信道代表一個會話任務。
- Message:消息,應用程式和伺服器之間傳送的資料,消息可以非常簡單,也可以很複雜。由Properties和Body組成。Properties為外包裝,可以對消息進行修飾,比如消息的優先級、延遲等進階特性;Body就是消息體内容。
- Virtual Host:虛拟主機,用于邏輯隔離。一個虛拟主機裡面可以有若幹個Exchange和Queue,同一個虛拟主機裡面不能有相同名稱的Exchange或Queue。
- Exchange:交換器,接收消息,按照路由規則将消息路由到一個或者多個隊列。如果路由不到,或者傳回給生産者,或者直接丢棄。RabbitMQ常用的交換器常用類型有direct、topic、fanout、headers四種,後面詳細介紹。
- Binding:綁定,交換器和消息隊列之間的虛拟連接配接,綁定中可以包含一個或者多個RoutingKey。
- RoutingKey:路由鍵,生産者将消息發送給交換器的時候,會發送一個RoutingKey,用來指定路由規則,這樣交換器就知道把消息發送到哪個隊列。路由鍵通常為一個“.”分割的字元串,例如“com.rabbitmq”。
- Queue:消息隊列,用來儲存消息,供消費者消費。
2.2 工作原理
AMQP 協定模型由三部分組成:生産者、消費者和服務端,執行流程如下:
- 生産者是連接配接到 Server,建立一個連接配接,開啟一個信道。
- 生産者聲明交換器和隊列,設定相關屬性,并通過路由鍵将交換器和隊列進行綁定。
- 消費者也需要進行建立連接配接,開啟信道等操作,便于接收消息。
- 生産者發送消息,發送到服務端中的虛拟主機。
- 虛拟主機中的交換器根據路由鍵選擇路由規則,發送到不同的消息隊列中。
- 訂閱了消息隊列的消費者就可以擷取到消息,進行消費。
2.3 常用交換器
RabbitMQ常用的交換器類型有direct、topic、fanout、headers四種:
- Direct Exchange:見文知意,直連交換機意思是此交換機需要綁定一個隊列,要求該消息與一個特定的路由鍵完全比對。簡單點說就是一對一的,點對點的發送。
- Fanout Exchange:這種類型的交換機需要将隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每台子網内的主機都獲得了一份複制的消息。簡單點說就是釋出訂閱。
- Topic Exchange:直接翻譯的話叫做主題交換機,如果從用法上面翻譯可能叫通配符交換機會更加貼切。這種交換機是使用通配符去比對,路由到對應的隊列。通配符有兩種:"*" 、 "#"。需要注意的是通配符前面必須要加上"."符号。
- *符号:有且隻比對一個詞。比如 a.*可以比對到"a.b"、"a.c",但是比對不了"a.b.c"。
- #符号:比對一個或多個詞。比如"rabbit.#"既可以比對到"rabbit.a.b"、"rabbit.a",也可以比對到"rabbit.a.b.c"。
- Headers Exchange:這種交換機用的相對沒這麼多。它跟上面三種有點差別,它的路由不是用routingKey進行路由比對,而是在比對請求頭中所帶的鍵值進行路由。建立隊列需要設定綁定的頭部資訊,有兩種模式:全部比對和部分比對。如上圖所示,交換機會根據生産者發送過來的頭部資訊攜帶的鍵值去比對隊列綁定的鍵值,路由到對應的隊列。
2.4 消費原理
我們先看幾個基本概念:
- broker:每個節點運作的服務程式,功能為維護該節點的隊列的增删以及轉發隊列操作請求。
- master queue:每個隊列都分為一個主隊列和若幹個鏡像隊列。
- mirror queue:鏡像隊列,作為master queue的備份。在master queue所在節點挂掉之後,系統把mirror queue提升為master queue,負責處理用戶端隊列操作請求。注意,mirror queue隻做鏡像,設計目的不是為了承擔用戶端讀寫壓力。
叢集中有兩個節點,每個節點上有一個broker,每個broker負責本機上隊列的維護,并且borker之間可以互相通信。叢集中有兩個隊列A和B,每個隊列都分為master queue和mirror queue(備份)。那麼隊列上的生産消費怎麼實作的呢?
對于消費隊列,如下圖有兩個consumer消費隊列A,這兩個consumer連在了叢集的不同機器上。RabbitMQ叢集中的任何一個節點都擁有叢集上所有隊列的元資訊,是以連接配接到叢集中的任何一個節點都可以,主要差別在于有的consumer連在master queue所在節點,有的連在非master queue節點上。
因為mirror queue要和master queue保持一緻,故需要同步機制,正因為一緻性的限制,導緻所有的讀寫操作都必須都操作在master queue上(想想,為啥讀也要從master queue中讀?和資料庫讀寫分離是不一樣的),然後由master節點同步操作到mirror queue所在的節點。即使consumer連接配接到了非master queue節點,該consumer的操作也會被路由到master queue所在的節點上,這樣才能進行消費。
對于生成隊列,原理和消費一樣,如果連接配接到非 master queue 節點,則路由過去。
是以,到這裡小夥伴們就可以看到 RabbitMQ的不足:由于master queue單節點,導緻性能瓶頸,吞吐量受限。雖然為了提高性能,内部使用了Erlang這個語言實作,但是終究擺脫不了架構設計上的緻命缺陷。
2.5 進階特性
2.5.1 過期時間
Time To Live,也就是生存時間,是一條消息在隊列中的最大存活時間,機關是毫秒,下面看看RabbitMQ過期時間特性:
- RabbitMQ可以對消息和隊列設定TTL。
- RabbitMQ支援設定消息的過期時間,在消息發送的時候可以進行指定,每條消息的過期時間可以不同。
- RabbitMQ支援設定隊列的過期時間,從消息入隊列開始計算,直到超過了隊列的逾時時間配置,那麼消息會變成死信,自動清除。
- 如果兩種方式一起使用,則過期時間以兩者中較小的那個數值為準。
- 當然也可以不設定TTL,不設定表示消息不會過期;如果設定為0,則表示除非此時可以直接将消息投遞到消費者,否則該消息将被立即丢棄。
2.5.2 消息确認
為了保證消息從隊列可靠地到達消費者,RabbitMQ提供了消息确認機制。
消費者訂閱隊列的時候,可以指定autoAck參數,當autoAck為true的時候,RabbitMQ采用自動确認模式,RabbitMQ自動把發送出去的消息設定為确認,然後從記憶體或者硬碟中删除,而不管消費者是否真正消費到了這些消息。
當autoAck為false的時候,RabbitMQ會等待消費者回複的确認信号,收到确認信号之後才從記憶體或者磁盤中删除消息。
消息确認機制是RabbitMQ消息可靠性投遞的基礎,隻要設定autoAck參數為false,消費者就有足夠的時間處理消息,不用擔心處理消息的過程中消費者程序挂掉後消息丢失的問題。
2.5.3 持久化
消息的可靠性是RabbitMQ的一大特色,那麼RabbitMQ是如何保證消息可靠性的呢?答案就是消息持久化。持久化可以防止在異常情況下丢失資料。RabbitMQ的持久化分為三個部分:交換器持久化、隊列持久化和消息的持久化。
交換器持久化可以通過在聲明隊列時将durable參數設定為true。如果交換器不設定持久化,那麼在RabbitMQ服務重新開機之後,相關的交換器中繼資料會丢失,不過消息不會丢失,隻是不能将消息發送到這個交換器了。
隊列的持久化能保證其本身的中繼資料不會因異常情況而丢失,但是不能保證内部所存儲的消息不會丢失。要確定消息不會丢失,需要将其設定為持久化。隊列的持久化可以通過在聲明隊列時将durable參數設定為true。
設定了隊列和消息的持久化,當RabbitMQ服務重新開機之後,消息依然存在。如果隻設定隊列持久化或者消息持久化,重新開機之後消息都會消失。
當然,也可以将所有的消息都設定為持久化,但是這樣做會影響RabbitMQ的性能,因為磁盤的寫入速度比記憶體的寫入要慢得多。
對于可靠性不是那麼高的消息可以不采用持久化處理以提高整體的吞吐量。魚和熊掌不可兼得,關鍵在于選擇和取舍。在實際中,需要根據實際情況在可靠性和吞吐量之間做一個權衡。
2.5.4 死信隊列
當消息在一個隊列中變成死信之後,他能被重新發送到另一個交換器中,這個交換器成為死信交換器,與該交換器綁定的隊列稱為死信隊列。
消息變成死信有下面幾種情況:
- 消息被拒絕。
- 消息過期
- 隊列達到最大長度
DLX也是一個正常的交換器,和一般的交換器沒有差別,他能在任何的隊列上面被指定,實際上就是設定某個隊列的屬性。當這個隊列中有死信的時候,RabbitMQ會自動将這個消息重新發送到設定的交換器上,進而被路由到另一個隊列,我們可以監聽這個隊列中消息做相應的處理。
死信隊列有什麼用?當發生異常的時候,消息不能夠被消費者正常消費,被加入到了死信隊列中。後續的程式可以根據死信隊列中的内容分析當時發生的異常,進而改善和優化系統。
2.5.5 延遲隊列
一般的隊列,消息一旦進入隊列就會被消費者立即消費。延遲隊列就是進入該隊列的消息會被消費者延遲消費,延遲隊列中存儲的對象是的延遲消息,“延遲消息”是指當消息被發送以後,等待特定的時間後,消費者才能拿到這個消息進行消費。
延遲隊列用于需要延遲工作的場景。最常見的使用場景:淘寶或者天貓我們都使用過,使用者在下單之後通常有30分鐘的時間進行支付,如果這30分鐘之内沒有支付成功,那麼訂單就會自動取消。
除了延遲消費,延遲隊列的典型應用場景還有延遲重試。比如消費者從隊列裡面消費消息失敗了,可以延遲一段時間以後進行重試。
2.6 特性分析
這裡才是内容的重點,不僅需要知道Rabbit的特性,還需要知道支援這些特性的原因:
- 消息路由(支援):RabbitMQ可以通過不同的交換器支援不同種類的消息路由;
- 消息有序(不支援):當消費消息時,如果消費失敗,消息會被放回隊列,然後重新消費,這樣會導緻消息無序;
- 消息時序(非常好):通過延時隊列,可以指定消息的延時時間,過期時間TTL等;
- 容錯處理(非常好):通過傳遞重試和死信交換器(DLX)來處理消息處理故障;
- 伸縮(一般):伸縮其實沒有非常智能,因為即使伸縮了,master queue還是隻有一個,負載還是隻有這一個master queue去抗,是以我了解RabbitMQ的伸縮很弱(個人了解)。
- 持久化(不太好):沒有消費的消息,可以支援持久化,這個是為了保證機器當機時消息可以恢複,但是消費過的消息,就會被馬上删除,因為RabbitMQ設計時,就不是為了去存儲曆史資料的。
- 消息回溯(支援):因為消息不支援永久儲存,是以自然就不支援回溯。
- 高吞吐(中等):因為所有的請求的執行,最後都是在master queue,它的這個設計,導緻單機性能達不到十萬級的标準。
3. RabbitMQ環境搭建
因為我用的是Mac,是以直接可以參考官網:
https://www.rabbitmq.com/install-homebrew.html
需要注意的是,一定需要先執行:
brew update
然後再執行:
brew install rabbitmq
之前沒有執行brew update,直接執行brew install rabbitmq時,會報各種各樣奇怪的錯誤,其中“403 Forbidde”居多。
但是在執行“brew install rabbitmq”,會自動安裝其它的程式,如果你使用源碼安裝Rabbitmq,因為啟動該服務依賴erlang環境,是以你還需手動安裝erlang,但是目前官方已經一鍵給你搞定,會自動安裝Rabbitmq依賴的所有程式,是不是很棒!
最後執行成功的輸出如下:
啟動服務:
# 啟動方式1:背景啟動
brew services start rabbitmq
# 啟動方式2:目前視窗啟動
cd /usr/local/Cellar/rabbitmq/3.8.19
rabbitmq-server
在浏覽器輸入:
http://localhost:15672/
會出現RabbitMQ背景管理界面(使用者名和密碼都為guest):
通過brew安裝,一行指令搞定,真香!
4. RabbitMQ測試
4.1 添加賬号
首先得啟動mq
## 添加賬号
./rabbitmqctl add_user admin admin
## 添加通路權限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 設定超級權限
./rabbitmqctl set_user_tags admin administrator
4.2 編碼實測
因為代碼中引入了java 8的特性,pom引入依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
開始寫代碼:
public class RabbitMqTest {
//消息隊列名稱
private final static String QUEUE_NAME = "hello";
@Test
public void send() throws java.io.IOException, TimeoutException {
//建立連接配接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//建立連接配接
Connection connection = factory.newConnection();
//建立消息通道
Channel channel = connection.createChannel();
//生成一個消息隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
String message = "Hello World RabbitMQ count: " + i;
//釋出消息,第一個參數表示路由(Exchange名稱),為""則表示使用預設消息路由
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
//關閉消息通道和連接配接
channel.close();
connection.close();
}
@Test
public void consumer() throws java.io.IOException, TimeoutException {
//建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
//建立連接配接
Connection connection = factory.newConnection();
//建立消息信道
final Channel channel = connection.createChannel();
//消息隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("[*] Waiting for message. To exist press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
執行send()後控制台輸出:
[x] Sent 'Hello World RabbitMQ count: 0'
[x] Sent 'Hello World RabbitMQ count: 1'
[x] Sent 'Hello World RabbitMQ count: 2'
[x] Sent 'Hello World RabbitMQ count: 3'
[x] Sent 'Hello World RabbitMQ count: 4'
[x] Sent 'Hello World RabbitMQ count: 5'
[x] Sent 'Hello World RabbitMQ count: 6'
[x] Sent 'Hello World RabbitMQ count: 7'
[x] Sent 'Hello World RabbitMQ count: 8'
[x] Sent 'Hello World RabbitMQ count: 9'
執行consumer()後:
示例中的代碼講解,可以直接參考官網:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
5. 基本使用姿勢
5.1 公共代碼封裝
封裝工廠類:
public class RabbitUtil {
public static ConnectionFactory getConnectionFactory() {
//建立連接配接工程,下面給出的是預設的case
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
return factory;
}
}
封裝生成者:
public class MsgProducer {
public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//建立連接配接
Connection connection = factory.newConnection();
//建立消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息為可持久化,不自動删除
channel.exchangeDeclare(exchange, exchangeType, true, false, null);
// 釋出消息
channel.basicPublish(exchange, toutingKey, null, message.getBytes());
System.out.println("Sent '" + message + "'");
channel.close();
connection.close();
}
}
封裝消費者:
public class MsgConsumer {
public static void consumerMsg(String exchange, String queue, String routingKey)
throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//建立連接配接
Connection connection = factory.newConnection();
//建立消息信道
final Channel channel = connection.createChannel();
//消息隊列
channel.queueDeclare(queue, true, false, false, null);
//綁定隊列到交換機
channel.queueBind(queue, exchange, routingKey);
System.out.println("[*] Waiting for message. To exist press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [x] Received '" + message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 取消自動ack
channel.basicConsume(queue, false, consumer);
}
}
5.2 Direct方式
5.2.1 Direct示例
生産者:
public class DirectProducer {
private static final String EXCHANGE_NAME = "direct.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DirectProducer directProducer = new DirectProducer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String msg = "hello >>> ";
for (int i = 0; i < 10; i++) {
directProducer.publishMsg(routingKey[i % 3], msg + i);
}
System.out.println("----over-------");
Thread.sleep(1000 * 60 * 100);
}
}
執行生産者,往消息隊列中放入10條消息,其中key分别為“aaa”、“bbb”和“ccc”,分别放入qa、qb、qc三個隊列:
下面是qa隊列的資訊:
消費者:
public class DirectConsumer {
private static final String exchangeName = "direct.exchange";
public void msgConsumer(String queueName, String routingKey) {
try {
MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String[] queueNames = new String[]{"qa", "qb", "qc"};
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 100);
}
}
執行後的輸出:
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 0
[x] Done
[x] Received 'hello >>> 3
[x] Done
[x] Received 'hello >>> 6
[x] Done
[x] Received 'hello >>> 9
[x] Done
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 1
[x] Done
[x] Received 'hello >>> 4
[x] Done
[x] Received 'hello >>> 7
[x] Done
[*] Waiting for message. To exist press CTRL+C
[x] Received 'hello >>> 2
[x] Done
[x] Received 'hello >>> 5
[x] Done
[x] Received 'hello >>> 8
[x] Done
可以看到,分别從qa、qb、qc中将不同的key的資料消費掉。
5.2.2 問題探讨
有個疑問:這個隊列的名稱qa、qb和qc是RabbitMQ自動生成的麼,我們可以指定隊列名稱麼?
我做了個簡單的實驗,我把消費者代碼修改了一下:
public static void main(String[] args) throws InterruptedException {
DirectConsumer consumer = new DirectConsumer();
String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
String[] queueNames = new String[]{"qa", "qb", "qc1"}; // 将qc修改為qc1
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], routingKey[i]);
}
Thread.sleep(1000 * 60 * 100);
}
執行後如下圖所示:
我們可以發現,多了一個qc1,是以可以判斷這個界面中的queues,是消費者執行時,會将消費者指定的隊列名稱和direct.exchange綁定,綁定的依據就是key。
當我們把隊列中的資料全部消費掉,然後重新執行生成者後,會發現qc和qc1中都有3條待消費的資料,因為綁定的key都是“ccc”,是以兩者的資料是一樣的:
綁定關系如下:
注意:當沒有Queue綁定到Exchange時,往Exchange中寫入的消息也不會重新分發到之後綁定的queue上。
思考:不執行消費者,看不到這個Queres中資訊,我其實可以把這個界面了解為消費者資訊界面。不過感覺還是怪怪的,這個queues如果是消費者資訊,就不應該叫queues,我了解queues應該是RabbitMQ中實際存放資料的queues,難道是我了解錯了?
5.3 Fanout方式(指定隊列)
生産者封裝:
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void publishMsg(String routingKey, String msg) {
try {
MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String msg = "hello >>> ";
for (int i = 0; i < 10; i++) {
directProducer.publishMsg("", msg + i);
}
}
}
消費者:
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout.exchange";
public void msgConsumer(String queueName, String routingKey) {
try {
MsgConsumer.consumerMsg(EXCHANGE_NAME, queueName, routingKey);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutConsumer consumer = new FanoutConsumer();
String[] queueNames = new String[]{"qa-2", "qb-2", "qc-2"};
for (int i = 0; i < 3; i++) {
consumer.msgConsumer(queueNames[i], "");
}
}
}
執行生成者,結果如下:
我們發現,生産者生産的10條資料,在每個消費者中都可以消費,這個是和Direct不同的地方,但是使用Fanout方式時,有幾個點需要注意一下:
- 生産者的routkey可以為空,因為生産者的所有資料,會下放到每一個隊列,是以不會通過routkey去路由;
- 消費者需要指定queues,因為消費者需要綁定到指定的queues才能消費。
這幅圖就畫出了Fanout的精髓之處,exchange會和所有的queue進行綁定,不區分路由,消費者需要綁定指定的queue才能發起消費。
注意:往隊列塞資料時,可能通過界面看不到消息個數的增加,可能是你之前已經開啟了消費程序,導緻增加的消息馬上被消費了。
5.4 Fanout方式(随機擷取隊列)
上面我們是指定了隊列,這個方式其實很不友好,比如對于Fanout,我其實根本無需關心隊列的名字,如果還指定對應隊列進行消費,感覺這個很備援,是以我們這裡就采用随機擷取隊列名字的方式,下面代碼直接Copy官網。
生成者封裝:
public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
//建立連接配接
Connection connection = factory.newConnection();
//建立消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息
channel.exchangeDeclare(exchange, exchangeType);
// 釋出消息
channel.basicPublish(exchange, "", null, message.getBytes("UTF-8"));
System.out.println("Sent '" + message + "'");
channel.close();
connection.close();
}
消費者封裝:
public static void consumerMsgV2(String exchange) throws IOException, TimeoutException {
ConnectionFactory factory = RabbitUtil.getConnectionFactory();
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
生産者:
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout.exchange.v2";
public void publishMsg(String msg) {
try {
MsgProducer.publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutProducer directProducer = new FanoutProducer();
String msg = "hello >>> ";
for (int i = 0; i < 10000; i++) {
directProducer.publishMsg(msg + i);
}
}
}
消費者:
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout.exchange.v2";
public void msgConsumer() {
try {
MsgConsumer.consumerMsgV2(EXCHANGE_NAME);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
FanoutConsumer consumer = new FanoutConsumer();
for (int i = 0; i < 3; i++) {
consumer.msgConsumer();
}
}
}
執行後,管理界面如下:
5.5 Topic方式
代碼詳見官網:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
更多方式,請直接檢視官網:https://www.rabbitmq.com/getstarted.html
6. RabbitMQ 進階
6.1 durable 和 autoDeleted
在定義Queue時,可以指定這兩個參數:
/**
* Declare an exchange.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @param autoDelete true if the server should delete the exchange when it is no longer in use
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
6.1.1 durable
持久化,保證RabbitMQ在退出或者crash等異常情況下資料沒有丢失,需要将queue,exchange和Message都持久化。
若是将queue的持久化辨別durable設定為true,則代表是一個持久的隊列,那麼在服務重新開機之後,會重新讀取之前被持久化的queue。
雖然隊列可以被持久化,但是裡面的消息是否為持久化,還要看消息的持久化設定。即重新開機queue,但是queue裡面還沒有發出去的消息,那隊列裡面還存在該消息麼?這個取決于該消息的設定。
6.1.2 autoDeleted
自動删除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動删除。這種隊列适用于臨時隊列。
當一個Queue被設定為自動删除時,當消費者斷掉之後,queue會被删除,這個主要針對的是一些不是特别重要的資料,不希望出現消息積累的情況。
6.1.3 小節
- 當一個Queue已經聲明好了之後,不能更新durable或者autoDelted值;當需要修改時,需要先删除再重新聲明
- 消費的Queue聲明應該和投遞的Queue聲明的 durable,autoDelted屬性一緻,否則會報錯
- 對于重要的資料,一般設定 durable=true, autoDeleted=false
- 對于設定 autoDeleted=true 的隊列,當沒有消費者之後,隊列會自動被删除
6.4 ACK
執行一個任務可能需要花費幾秒鐘,你可能會擔心如果一個消費者在執行任務過程中挂掉了。一旦RabbitMQ将消息分發給了消費者,就會從記憶體中删除。在這種情況下,如果正在執行任務的消費者當機,會丢失正在處理的消息和分發給這個消費者但尚未處理的消息。
但是,我們不想丢失任何任務,如果有一個消費者挂掉了,那麼我們應該将分發給它的任務傳遞給另一個消費者去處理。
為了確定消息不會丢失,RabbitMQ支援消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收并且處理完畢了。RabbitMQ就可以删除它了。
是以手動ACK的常見手段:
// 接收消息之後,主動ack/nak
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
try {
System.out.println(" [ " + queue + " ] Received '" + message);
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
};
// 取消自動ack
channel.basicConsume(queue, false, consumer);
技術交流群
最近有很多人問,有沒有讀者交流群,想知道怎麼加入。
最近我建立了一些群,大家可以加入。交流群都是免費的,隻需要大家加入之後不要随便發廣告,多多交流技術就好了。
目前建立了多個交流群,全國交流群、北上廣杭深等各地區交流群、面試交流群、資源共享群等。