天天看點

RabbitMQ:從零開始

一、介紹

RabbitMQ是基于AMQP協定的消息中間件,伺服器端用Erlang語言編寫,支援多種用戶端,用于在分布式系統中存儲轉發消息,在易用性、擴充性、高可用性等方面表現不俗。

二、安裝

去官網下載下傳對應的版本,在安裝rabbitmq之前需要安裝對應Erlang環境,安裝完成後通過指令/sbin/service rabbitmq-server start啟動。

http://www.rabbitmq.com/download.html

三、基本配置

1、使用者權限配置:rabbitmq預設的guest/guest使用者隻能在本機通路,如果需要外網使用者通路,需要單獨建立使用者,通過指令建立使用者及賦予權限 ,通過指令:./rabbitmqctl add_user test test ,./rabbitmqctl set_user_tags test administrator添加test使用者和賦予admin權限。注:指令行建立的使用者需要在接下來的web監控頁面使用者管理裡設定權限(下圖的Set permisson),否則無法連接配接成功。

2、WEB監控配置:通過./rabbitmq-plugins enable rabbitmq_management指令開啟web監控頁面顯示,預設端口15672,可通過localhost:15672檢視web監控頁面,友善後期管理和檢視隊列消息。

四、Java Demo

消息提供者Provider:

package org.rabbitmq.RabbitMq;

import java.io.IOException;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

/

  • java通路mq基礎demo --provider
  • @author Mr.tanzc
  • */

    public class MySpreadSend {

    //發送消息

    public static void main(String[] args) throws IOException {

    /使用工廠類建立Connection和Channel,并且設定參數/

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost("127.0.0.1");//MQ的IP

    factory.setPort(5672);//MQ端口

    factory.setUsername("test");//MQ使用者名

    factory.setPassword("test");//MQ密碼

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

    /*定義交換機*/
    channel.exchangeDeclare("COLOR_EXCHANGE", "direct");
    
    /*建立多個消息隊列*/
    channel.queueDeclare("BLACK_QUEUE", false, false, false, null);
    channel.queueDeclare("RED_QUEUE", false, false, false, null);
    
    /*綁定交換機和隊列*/
    channel.queueBind("BLACK_QUEUE", "COLOR_EXCHANGE", "black");
    channel.queueBind("RED_QUEUE", "COLOR_EXCHANGE", "red");
    
    /*通過交換機發送不同類别的消息到不同的隊列中,注意,消息是由一個交換機來根據标志發往不同的隊列中去*/
    String message = "black";
    channel.basicPublish("COLOR_EXCHANGE", "black", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    
    message="red";
    channel.basicPublish("COLOR_EXCHANGE", "red", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    
    /*關閉連接配接*/
    channel.close();
    connection.close();
      }           
}**

定義兩個消費者Comsumer:

第一個消費者MySpreadRecvRed

import com.rabbitmq.client.ConsumerCancelledException;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.ShutdownSignalException;

public class MySpreadRecvRed {

public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
    /*建立連接配接*/
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("127.0.0.1");//MQ的IP
    factory.setPort(5672);//MQ端口
    factory.setUsername("test");//MQ使用者名
    factory.setPassword("test");//MQ密碼
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    /*聲明要連接配接的隊列*/
    /*定義交換機*/
    channel.exchangeDeclare("COLOR_EXCHANGE", "direct");
    /*綁定交換機和隊列*/
    channel.queueBind("RED_QUEUE", "COLOR_EXCHANGE", "red");
    /*建立消費者對象,用于讀取消息*/
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume("RED_QUEUE", true, consumer);
    /* 讀取隊列,并且阻塞,即在讀到消息之前在這裡阻塞,直到等到消息,完成消息的閱讀後,繼續阻塞循環*/
    while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [x] Received '" + message + "'");
    }
    }           
}

第二個消費者MySpreadRecvBlack:

/**
  • 擷取指定隊列的消息
  • public class MySpreadRecvBlack {

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {

    /建立連接配接/

    factory.setHost("192.168.1.25");//MQ的IP

    factory.setUsername("twk");//MQ使用者名

    factory.setPassword("twk");//MQ密碼

    /*聲明要連接配接的隊列*/
    /*定義交換機*/
    channel.exchangeDeclare("COLOR_EXCHANGE", "direct");
    /*綁定交換機和隊列*/
    channel.queueBind("BLACK_QUEUE", "COLOR_EXCHANGE", "black");        
    /*建立消費者對象,用于讀取消息*/
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume("BLACK_QUEUE", true, consumer);
    /* 讀取隊列,并且阻塞,即在讀到消息之前在這裡阻塞,直到等到消息,完成消息的閱讀後,繼續阻塞循環*/
    while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [x] Received '" + message + "'");
    }
    }           

先運作Provider,檢視到mq監控台隊列裡面有消息,再運作Comsumer,可以看到列印出來的消費資訊,隊列裡面對應資料清空。

五、基礎API使用

主要對使用過的RabbitMQ一些概念和功能進行說明。

1.MQ使用流程

  • 用戶端連接配接到消息隊列伺服器,打開一個channel。
  • 用戶端聲明一個exchange,并設定相關屬性。
  • 用戶端聲明一個queue,并設定相關屬性。
  • 用戶端使用routing key,在exchange和queue之間建立好綁定關系。
  • 用戶端投遞消息到exchange。
  • exchange接收到消息後,就根據消息的key和已經設定的binding,進行消息路由,将消息投遞到一個或多個隊列裡。

2.ConnectionFactory

連接配接工廠類,我們通過這個類可以設定一些連接配接參數

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");//MQ的IP
factory.setPort(5672);//MQ端口
factory.setUsername("test");//MQ使用者名
factory.setPassword("test");//MQ密碼           

3.Connection

是通過工廠類New出來的rabbitMq連接配接,我們後續跟mq的互動都是基于這個連接配接。

4.Channel 通道

進行消息讀寫的通道,可以了解為指向隊列的路徑

5.交換機Exchange

定義了消息路由規則

6.隊列Queue

是存儲消息的基本單元

7.Bind

綁定了Queue和Exchange,意即為符合什麼樣路由規則的消息,将會放置入哪一個消息隊列

使用隊列之前都需要用channel聲明隊列,channel.queueDeclare方法會在隊列不存在的時候建立隊列,如果隊列存在,則不建立。

Channel channel = connection.createChannel();

/*定義交換機*/
channel.exchangeDeclare("COLOR_EXCHANGE", "direct");
/*建立多個消息隊列*/
channel.queueDeclare("BLACK_QUEUE", false, false, false, null);
channel.queueDeclare("RED_QUEUE", false, false, false, null);
/*綁定交換機和隊列*/
channel.queueBind("BLACK_QUEUE", "COLOR_EXCHANGE", "black");
channel.queueBind("RED_QUEUE", "COLOR_EXCHANGE", "red");           

8.publish

發送消息到隊列:通過channel.basicPublish方法來發送消息到指定隊列,第一個參數是交換機名稱,第二個參數是隊列名稱,第三個參數是用于優先級隊列(後面再談,若不适用為null),最後一個參數為消息内容的位元組

/通過交換機發送不同類别的消息到不同的隊列中,注意,消息是由一個交換機來根據标志發往不同的隊列中去/

String message = "black";

channel.basicPublish("COLOR_EXCHANGE", "black", null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

message="red";
channel.basicPublish("COLOR_EXCHANGE", "red", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");           

9.consume

消費者消費消息也需要建立通道,聲明指定交換機和隊列,然後通過consumer對象的basicConsume方法來綁定隊列和消費者,第一個參數是隊列名稱,第二個參數為是否自動ack

nextDelivery(long timeout)方法可以擷取消息,參數為最多等待時間,如果在這個時間内擷取到消息,則傳回消息,若無,将會最多等待這個時間,仍沒有消息資料就會傳回null。

再通過consumer.nextDelivery().getBody()方法擷取消息内容

/建立消費者對象,用于讀取消息/

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume("RED_QUEUE", false, consumer);

/ 讀取隊列,并且阻塞,即在讀到消息之前在這裡阻塞,直到等到消息,完成消息的閱讀後, 繼續阻塞循環/

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [x] Received '" + message + "'");

六、ACK機制

QUEUE裡面的消息存在一個ACK機制,即消息的确認機制。

channel.basicConsume(queueName, false, consumer)

第二個參數就是是否自動ack。

在實際項目應用中,消費者拿到消息進行處理需要一段時間,中間因為使用者中止操作或者因為網絡問題當機時,如果設定為自動ack,這個消費就會丢失。為防止這種情況的出現,一般設定為取消auto ack,需要消費者發送确認回執後才從隊列中删除,確定消息能夠被正确消費。

消費者處理完消息後可通過

channel.basicAck(delivery .getEnvelope().getDeliveryTag(), false);

來确認ack消息,這個方法的第一個參數是消息的辨別tag,第二個為是否重新放入隊列,設定為false消息會從隊列裡面删除,true的話會重新放入隊列,等待再次消費。

七、消息的持久化

MQ的消息是支援持久化操作的,會将資料儲存到硬碟裡面,防止因為伺服器當機或者RabbitMQ重新開機後的資料丢失。我們要分别設定隊列的持久化(重新開機後隊列名稱恢複)和消息的持久化(重新開機後資料保留)。

隊列的持久化:

channel.queueDeclare(queueName, true, false, false, null);

第一個參數為隊列名稱,第二個參數即是否持久化操作,設定為true即實作隊列資訊的持久化

消息的持久化:

channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

其中第三個參數MessageProperties.PERSISTENT_TEXT_PLAIN即設定消息的持久化

八、消息的公平分發

如果有多個消費者監聽同一隊列,MQ預設會把消息平攤給多個消費者,在不同消費者處理時間不同的情況下,就有可能造成某個消費者堆積了很多消息未處理而另外一個消費者無消息可處理的情況,為避免這種情況,我們可以通過

channel.basicQos(1,true);           

方法設定每個消費者的分發數量,1即代表這個消費者每次最多處理一個消息,如果要拿下一個消息,必須把未ack的消息ack掉以後才能拿到下一個消息,這樣就實作了消息的公平分發機制。

九、消息的優先級

隊列的消息預設是先進先出的,這也是RabbitMQ預設支援的隊列。

3.5.0版本之前的MQ預設是不支援優先級隊列的,隻能通過插件安裝的方式來實作。

3.5.0版本之後的MQ已經內建了這一功能,可以直接使用。

首先,我們需要聲明優先級隊列:

Map<String, Object> args = new HashMap<String, Object>();

args.put("x-max-priority", 100);

channel.queueDeclare(queueName, true, false, false, args);

首先定義一個BasicProperties的變量,傳遞一個優先級為1的變量,再将這個參數傳遞到

basicPublish方法的第三個參數(即之前持久化的參數,MessageProperties.PERSISTENT_BASIC.builder()方法也是持久化的)

十、消息的路由分發

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種(AMQP規範裡還提到兩種Exchange Type,分别為system與自定義,這裡不予以描述)。

預設的是direct直接發送,如上文中queueDeclare方法中不指定交換機,而直接指定隊列名稱,就是預設的交換機,将消息放到對應名稱的隊列中。這種形式在發送和消費消息的時候都需要指定對應隊列名稱。

fanout即廣播形式,通過交換機将消息發到綁定的所有隊列上面。

// 聲明一個名稱為"exchange_fanout"的exchange

channel.exchangeDeclare("exchange_fanout", "fanout");

// 将消息發送給exchange

channel.basicPublish("exchange_fanout", "", null, msg.getBytes());

topic即比對模式,我們可以為每一個消息類型設定一個主題topic,放入隊列的時候帶上topic,消費者擷取消息的時候可以通過指定topic來擷取消息。

消息提供者:

//指定topic類型的交換機
String exchange = "exchange03"; 
String msgType= "type1";  
channel.exchangeDeclare(exchange , "topic") ;  
String msg = "Hello World!";  
//發送指定主題的消息  
channel.basicPublish( exchange , msgType, null , msg.getBytes());           

消息消費者:

String exchange = "exchange03";  
channel.exchangeDeclare(exchange , "topic") ;    
String queueName = channel.queueDeclare().getQueue() ;  

//第三個參數就是type
channel.queueBind(queueName, exchangeName, "type1") ;           
QueueingConsumer consumer = new QueueingConsumer(channel) ;  
channel.basicConsume(queueName, true, consumer) ;  

//循環擷取消息  
while(true){  
    //擷取消息,如果沒有消息,這一步将會一直阻塞,可以設定逾時時間  
    Delivery delivery = consumer.nextDelivery() ;  
    String msg = new String(delivery.getBody()) ;    
    System.out.println("received message[" + msg + "] from " + exchangeName);  
}             

十一、Spring內建

RabbitMQ可以和Spring無縫內建,就無需跟client打交道,使用更為友善。

spring-rabbit.xml:

<beans xmlns="

http://www.springframework.org/schema/beans

"

xmlns:xsi="

http://www.w3.org/2001/XMLSchema-instance xmlns:rabbit=" http://www.springframework.org/schema/rabbit xmlns:context=" http://www.springframework.org/schema/context xsi:schemaLocation=" http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/context/spring-context.xsd"&gt ;
&lt;description&gt;rabbitmq 連接配接服務配置&lt;/description&gt;
&lt;context:component-scan base-package="com.mop.self.mq"/&gt;
&lt;context:property-placeholder location="classpath:conf/rabbitmq.properties"/&gt;

&lt;!-- 連接配接配置 --&gt;
&lt;rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/&gt;
&lt;rabbit:admin connection-factory="connectionFactory"/&gt;

&lt;!-- spring template聲明--&gt;
&lt;rabbit:template exchange="test-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter" /&gt;

&lt;!-- 隊列說明 --&gt;
&lt;rabbit:queue id="test_queue" name="test_queue" durable="true" auto-delete="false" exclusive="false" /&gt;

&lt;!-- direct 交換器 --&gt;
&lt;rabbit:direct-exchange name="test-mq-exchange" durable="false" auto-delete="false" id="test-mq-exchange"&gt;
    &lt;rabbit:bindings&gt;
        &lt;rabbit:binding queue="test_queue" key="test_queue_key"/&gt;
    &lt;/rabbit:bindings&gt;
&lt;/rabbit:direct-exchange&gt;

&lt;rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" message-converter="jsonMessageConverter"&gt;
    &lt;rabbit:listener ref="mqListener" queues="test_queue" /&gt;
&lt;/rabbit:listener-container&gt;

&lt;!-- 消息對象json轉換類 --&gt;
&lt;bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /&gt;           
</beans>

rabbitmq.propreties:

mq.host=127.0.0.1

mq.username=tzc

mq.password=tzc

mq.port=5672

定義MQ消息傳輸類和生産消費者:

MqMessage:

package com.mop.self.mq;
  • Author: Mr.tan
  • Date: 2017/08/04

    > Description:MQ消息封裝

    public class MqMessage {

    private String id;

    private String name;

    public String getId() {

    return id;

    public void setId(String id) {

    this.id = id;

    public String getName() {

    return name;

    public void setName(String name) {

    this.name = name;

    @Override

    public String toString() {

    return "MqMessage{" +

    "id='" + id + '\'' +

    ", name='" + name + '\'' +

    '}';

MqProducer:

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

  • Description: MQ生産者

    br/>*/

    @Component

    public class MqProducer {

    private static final Logger logger = LoggerFactory.getLogger(MqProducer.class);

    @Autowired

    private AmqpTemplate amqpTemplate;

    public void sendMessage(Object message){

    logger.debug("produce message:"+message);

    amqpTemplate.convertAndSend("test_queue_key",message);

MqListener:

import com.alibaba.fastjson.JSON;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageListener;

  • Description: mq監聽 public class MqListener implements MessageListener {

    private static final Logger logger = LoggerFactory.getLogger(MqListener.class);

    public void onMessage(Message message) {

    MqMessage mqMessage = JSON.parseObject(new String(message.getBody()),MqMessage.class);

    logger.debug("get message:"+mqMessage);

測試代碼:

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

  • Description:

    public class MqTest {

    public static void main(String[]args){

    ApplicationContext applicationContext=new ClassPathXmlApplicationContext("classpath:conf/spring-core.xml");

    MqProducer mqProducer = (MqProducer) applicationContext.getBean("mqProducer");

    MqMessage mqMessage = new MqMessage();

    mqMessage.setId("34");

    mqMessage.setName("測試");

    mqProducer.sendMessage(mqMessage);

執行方法,生産者發送消息,監聽擷取到消息并列印