摘要 一次将消息發送到多個消費者
RabbitMQ RabbitMQ入門
目錄[-]
- 釋出和訂閱
- (使用java 用戶端)
- 交換
- 臨時隊列
- 綁定
- 把所有放在一起
在先前的指南中,我們建立了一個工作隊列。這工作隊列後面的假想是每一個任務都被準确的傳遞給工作者。在這部分我們将會做一些完全不同的事情–我們将一個消息傳遞給多個消費者。這部分被認知為“釋出和訂閱”。
為了說明這個部分,我們會建立一個簡單德日志系統,它是由兩個程式組成–第一個發出日志消息,第二個接收和列印它們。
在我們的日志系統中,每一個運作的接收者拷貝程式将會獲得資訊。通過這個方式我們可以運作一個接收者,直接的把日志記錄到硬碟中;在同一時間我們可以運作另一個接收者,在螢幕上看這些日志。
本質上,釋出日志消息等同于廣播到所有接收者。
在先前指南部分,我們将消息發送到隊列裡,并從隊列中接收消息。現在是時候介紹RabbitMQ中全消息模型。
讓我們快速溫習下在先前指南中我們掌握的:
一個發送消息的生産者是一個使用者程式。
一個存儲消息的隊列是一個緩沖。
一個接收消息的消費者是一個使用者程式。
在RabbitMQ消息模型中核心的思想是生産者從不直接将消息發送給隊列。實際上,生産者常常甚至不知道是否一個消息會被傳遞到隊列中。
相反,生産者僅能将消息發送到一個交換機。一個交換機是一個非常簡單的事物。在它的一遍,它從生産者那裡接收消息,另一邊将消息推送到隊列中。這個交換所必須清楚的知道它所接收到的消息要如何處理。是否将它附加到一個特别的隊列中?是否将它附加到多個隊列中?或者是否它應該被丢棄。規則的定義是由交換類型決定的。
有幾個交換類型:
direct
,
topic
deaders
fanout
。我們來關注最後一個–
fanout
。讓我們建立一個這種類型的交換機并且稱呼它為
logs
:
channel.exchangeDeclare("logs", "fanout");
這
fanout
交換機是非常簡單的。通過這個名字你可能已經猜出它的用處了,它會将接收的所有消息都廣播到所有它所知道的所有隊列。這個真正是我們的記錄器所需要的。
交換機清單
為了列出伺服器中所有交換機,你可以運作着有用的
:
rabbitmqctl
在這個清單裡有一些以$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.
amq.
打頭的交換機和預設(未命名)的交換機。這些是預設建立的,但是不太可能你會在某個時刻使用它們。
匿名交換機
在先前的指南中我們對交換機毫無了解,但是我們依舊能将消息發送到隊列中。那是可能實作的,因為我們使用的是預設交換機,通過我們使用空字元串(““)辨別它。
回想一下我們以前是如何發送消息的:
這第一個參數是交換機的名字。空字元串說明它是預設的或者匿名的交換機:路由關鍵字存在的話,消息通過路由關鍵字的名字路由到特定的隊列上。channel.basicPublish("", "hello", null, message.getBytes());
現在,我們可以釋出我們自己命名的交換機:
channel.basicPublish( "logs", "", null, message.getBytes());
你可能會想起先前我們使用的隊列是有特定的名字的(是否記得
hello
和
task_queue
)。命名一個隊列對我們來說是至關重要的–我們需要指定工作者到這相同的隊列上。當你想把隊列分享給生産者和消費者,給隊列名是重要的。
但是那不是我們記錄器的執行個體。我們想監聽所有日志消息,不僅僅是它們中的子集。我們同樣是對目前的消息流感興趣,而不是舊的。為了解決這個我們需要兩件事。
首先,無論我們什麼時候連接配接RabbitMQ,我們需要一個新的,空的隊列。為了做到這些,我們可以建立一個随機名字的隊列或者更勝一籌-讓伺服器為我們選擇一個随機的名字。
第二部,一旦我們将消費者的連接配接斷開,隊列應該自動删除。
在Java用戶端裡,當我們使用無參數調用
queueDeclare()
方法,我們建立一個自動産生的名字,不持久化,獨占的,自動删除的隊列。
String queueName = channel.queueDeclare().getQueue();
在這點,隊列名中包含一個随機隊列名。例如名字像
amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
我們已經建立了一個
fanout
交換機和隊列。現在我們需要告訴交換機發送消息給我們的隊列上。這交換機和隊列之間的關系稱之為一個綁定。
channel.queueBind(queueName, "logs", "");
從現在開始,日志交換所将要附加消息到我們的隊列中。
綁定清單
你可以列出存在的綁定使用,使用
rabbitmqctl list_bindings
這發送日志消息的生産者程式,跟以前指南中的程式沒有多少不同。這最重要的改變是我們将匿名的交換機替換為我們想要消息釋出到的日志交換機。當發送是我們需要申請一個路由關鍵字,但是在廣播消息是它的值會被忽略。這是
EmitLog.java
程式的代碼:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}
(EmitLog.java source)
如你所知,建立連接配接後我們聲明一個交換機。這個步驟是必須的,因為釋出到一個不存在的交換機是禁止的。
如果隊列還沒有綁定到交換機上,消息将會丢失,但是這個對我們來說是ok的;如果沒有消費者正在監聽,我們可以安全的丢棄消息。
ReceiveLogs.java
代碼:
java.lang.InterruptedException {
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
(ReceiveLogs.java source)
如以前那樣編譯,我們已經做了。
$ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java
如果你想把日志儲存到檔案中,僅僅打開一個控制平台,鍵入:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log
如果你想在你的螢幕上看這些日志, 建立一個終端并且運作:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs
當然,為了發出日志鍵入:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.