RabbitMQ 簡述
RabbitMQ是一個消息代理:它接受并轉發消息。 您可以将其視為郵局:當您将要把寄發的郵件投遞到郵箱中時,您可以确信Postman 先生最終會将郵件發送給收件人。 在這個比喻中,RabbitMQ是一個郵箱,郵局和郵差,用來接受,存儲和轉發二進制資料塊的消息。
隊列就像是在RabbitMQ中扮演郵箱的角色。 雖然消息經過RabbitMQ和應用程式,但它們隻能存儲在隊列中。 隊列隻受主機的記憶體和磁盤限制的限制,它本質上是一個大的消息緩沖區。 許多生産者可以發送到一個隊列的消息,許多消費者可以嘗試從一個隊列接收資料。
producer即為生産者,用來産生消息發送給隊列。consumer是消費者,需要去讀隊列内的消息。producer,consumer和broker(rabbitMQ server)不必駐留在同一個主機上;确實在大多數應用程式中它們是這樣分布的。
簡單隊列
簡單隊列是最簡單的一種模式,由生産者、隊列、消費者組成。生産者将消息發送給隊列,消費者從隊列中讀取消息完成消費。
在下圖中,“P”是我們的生産者,“C”是我們的消費者。 中間的框是隊列 - RabbitMQ代表消費者的消息緩沖區。
java 方式
生産者
package com.anqi.mq.nat;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MyProducer {
private static final String QUEUE_NAME = "ITEM_QUEUE";
public static void main(String[] args) throws Exception {
//1. 建立一個 ConnectionFactory 并進行設定
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通過連接配接工廠來建立連接配接
Connection connection = factory.newConnection();
//3. 通過 Connection 來建立 Channel
Channel channel = connection.createChannel();
//實際場景中,消息多為json格式的對象
String msg = "hello";
//4. 發送三條資料
for (int i = 1; i <= 3 ; i++) {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("Send message" + i +" : " + msg);
}
//5. 關閉連接配接
channel.close();
connection.close();
}
}
/**
* Declare a queue
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
/**
* Publish a message
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag.
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback an interface to the consumer object
* @return the consumerTag generated by the server
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
消費者
package com.anqi.mq.nat;
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer {
private static final String QUEUE_NAME = "ITEM_QUEUE";
public static void main(String[] args) throws Exception {
//1. 建立一個 ConnectionFactory 并進行設定
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通過連接配接工廠來建立連接配接
Connection connection = factory.newConnection();
//3. 通過 Connection 來建立 Channel
Channel channel = connection.createChannel();
//4. 聲明一個隊列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/*
true:表示自動确認,隻要消息從隊列中擷取,無論消費者擷取到消息後是否成功消費,都會認為消息已經成功消費
false:表示手動确認,消費者擷取消息後,伺服器會将該消息标記為不可用狀态,等待消費者的回報,如果消費者一
直沒有回報,那麼該消息将一直處于不可用狀态,并且伺服器會認為該消費者已經挂掉,不會再給其發送消息,
直到該消費者回報。
*/
//5. 建立消費者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. 設定 Channel 消費者綁定隊列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
Send message1 : hello
Send message2 : hello
Send message3 : hello
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'hello'
[x] Received 'hello'
[x] Received 'hello'
總結
隊列聲明queueDeclare的參數:第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列将在伺服器重新開機時生存)、第三個參數為是否是獨占隊列(建立者可以使用的私有隊列,斷開後自動删除)、第四個參數為當所有消費者用戶端連接配接斷開時是否自動删除隊列、第五個參數為隊列的其他參數。
basicConsume的第二個參數autoAck: 應答模式,true:自動應答,即消費者擷取到消息,該消息就會從隊列中删除掉,false:手動應答,當從隊列中取出消息後,需要程式員手動調用方法應答,如果沒有應答,該消息還會再放進隊列中,就會出現該消息一直沒有被消費掉的現象。
這種簡單隊列的模式,系統會為每個隊列隐式地綁定一個預設交換機,交換機名稱為" (AMQP default)",類型為直連 direct,當你手動建立一個隊列時,系統會自動将這個隊列綁定到一個名稱為空的 Direct 類型的交換機上,綁定的路由鍵 routing key 與隊列名稱相同,相當于channel.queueBind(queue:"QUEUE_NAME", exchange:"(AMQP default)“, routingKey:"QUEUE_NAME");雖然執行個體沒有顯式聲明交換機,但是當路由鍵和隊列名稱一樣時,就會将消息發送到這個預設的交換機中。這種方式比較簡單,但是無法滿足複雜的業務需求,是以通常在生産環境中很少使用這種方式。
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.預設交換機隐式綁定到每個隊列,其中路由鍵等于隊列名稱。不可能顯式綁定到,或從預設交換中解除綁定。它也不能被删除。
——引自 RabbitMQ 官方文檔
spring-amqp方式
引入 Maven 依賴
使用測試