天天看點

Rabbitmq基本概念與使用

Rabbitmq入門

    • 一、RabbitMQ簡介
    • 二、RabbitMQ安裝運作
    • 三、RabbitMQ基本配置
      • 1、預設配置
      • 2、RabbitMQ端口
    • 四、RabbitMQ管理界面
      • 1、 開啟插件
      • 2、 添加使用者
      • 3、 為使用者配置設定操作權限
      • 4、 為使用者配置設定資源權限
    • 五、rabbitmq角色
      • 1、none
      • 2、management
      • 3、policymaker
      • 4、monitoring
      • 5、administrator
    • 六、一個簡單的Demo
      • 1、引入依賴
      • 2、代碼
    • 七、AMQP協定
      • 1、簡介
      • 2、AMQP結構
      • 3、AMQP生産者流轉過程
      • 4、AMQP消費者流轉過程
    • 八、rabbitmq核心概念
      • 1、Producer
        • (1)消息體(payload)
        • (2)附加資訊
      • 2、Broker
      • 3、Virtual Host
      • 4、Channel
      • 5、RoutingKey
      • 6、Exchange
        • (1)預設的exchange:
        • (2)Fanout:扇型交換機
        • (3)Direct:直連交換機
        • (4)Topic:主題交換機
        • (5)headers:頭交換機
      • 7、Queue
      • 8、Binding
      • 9、Consumer
    • 九、運轉流程
      • 1、整個消息的運轉流程
      • 2、RabbitMQ的運轉流程

一、RabbitMQ簡介

RabbitMQ是一個開源的AMQP實作,伺服器端用Erlang語言編寫,支援多種用戶端。用于在分布式系統中存儲轉發消息,在易用性、擴充性、高可用性等方面表現不俗。

二、RabbitMQ安裝運作

環境準備:CentOS7、Erlang

需要安裝erlang、socat、rabbitmq

三、RabbitMQ基本配置

1、預設配置

RabbitMQ有一套預設的配置,能夠滿足日常開發需求,如果需要修改,需要自己建立一個配置檔案

touch/etc/rabbitmq/rabbitmq.conf
           

配置檔案示例:

https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.conf.example
           

配置項說明:

https://www.rabbitmq.com/configure.html#config-items
           

2、RabbitMQ端口

rabbitmq會綁定一些端口,安裝完後,需要将這些端口添加至防火牆。

4369

是Erlang的端口/節點名稱映射程式,用來跟蹤節點名稱監聽位址,在叢集中起到一個類似DNS的作
           

5672,5671:

AMQP 0-9-1和1.0用戶端端口,沒有使用SSL和使用SSL的端口
           

25672(做管理用):

用于RabbitMQ節點間和CLI工具通信,配合4369使用。
           

15672:

HTTP_API端口,管理者使用者才能通路,用于管理rabbitmq,需要啟用management插件。
           

61613,61614:

當STOMP插件啟用的時候打開,作為STOMP用戶端端口(根據是否使用TLS選擇)。
           

1883,8883:

當MQTT插件啟用的時候打開,作為MQTT用戶端端口(根據是否使用TLS選擇)。
           

15674:

基于WebSocket的STOMP用戶端端口(當插件Web STOMP啟用的時候打開)。
           

15675:

基于Websocket的MQTT用戶端端口(當插件Web MQTT啟用的時候打開)。
           

四、RabbitMQ管理界面

1、 開啟插件

rabbitmq-plugins enable rabbitmq_management
           

說明:rabbitmq有一個預設的guest使用者,但隻能通過localhost通路,是以需要添加一個能夠遠端通路的使用者。

2、 添加使用者

rabbitmqctl add_user admin admin
           

3、 為使用者配置設定操作權限

rabbitmqctl set_user_tags admin administrator
           

4、 為使用者配置設定資源權限

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
           

五、rabbitmq角色

1、none

不能通路management plugin
           

2、management

使用者可以通過AMQP做任何事,還可以做如下的事:

1、列出自己可以通過AMQP登入的virtual hosts
    2、檢視自己的virtual hosts中的queues,exchanges和bindings
    3、檢視和關閉自己的channels和connections
    4、檢視有關自己的virtual hosts的“全局”的統計資訊,包含其他使用者在這些virtual hosts中的活動。
           

3、policymaker

可以做management做的任何事,還可以做如下的事:

1.檢視、建立和删除自己的virtual hosts所屬的policies和parameters
           

4、monitoring

可以做management做的任何事,還可以做如下的事:

1、列出所有virtual hosts,包括他們不能登入的virtual hosts
    2、檢視其他使用者的channels和connections
    3、檢視節點級别的資料如clustering和memory使用情況
    4、檢視真正的關于所有virtual hosts的全局的統計資訊
           

5、administrator

可以做policymaker和monitoring做的任何事,還可以做如下的事:

1、建立和删除virtual hosts
  2、檢視、建立和删除users
  3、檢視建立和删除permissions
  4、關閉其他使用者的connections
           

六、一個簡單的Demo

1、引入依賴

<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.1</version>
    </dependency>
           

2、代碼

建立生産者類 Producer.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 簡單隊列生産者
 * 使用RabbitMQ的預設交換器發送消息
 */
public class Producer {

    public static void main(String[] args) {
        // 1、建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2、設定連接配接屬性
        //根據實際位址填寫
        factory.setHost("127.0.0.1");
        //可以不設定,預設也是5672
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3、從連接配接工廠擷取連接配接
            connection = factory.newConnection("生産者");

            // 4、從連結中建立通道
            channel = connection.createChannel();

            /**
             * 5、聲明(建立)隊列
             * 如果隊列不存在,才會建立
             * RabbitMQ 不允許聲明兩個隊列名相同,屬性不同的隊列,否則會報錯
             *
             * queueDeclare參數說明:
             * @param queue 隊列名稱
             * @param durable 隊列是否持久化
             * @param exclusive 是否排他,即是否為私有的,如果為true,會對目前隊列加鎖,其它通道不能通路,并且在連接配接關閉時會自動删除,不受持久化和自動删除的屬性控制
             * @param autoDelete 是否自動删除,當最後一個消費者斷開連接配接之後是否自動删除
             * @param arguments 隊列參數,設定隊列的有效期、消息最大長度、隊列中所有消息的生命周期等等
             */
            channel.queueDeclare("queue1", false, false, false, null);

            // 消息内容
            String message = "Hello World!";
            // 6、發送消息
            channel.basicPublish("", "queue1", null, message.getBytes());
            System.out.println("消息已發送!");

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 7、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 8、關閉連接配接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           

建立消費者類 Consumer.java

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 簡單隊列消費者
 */
public class Consumer {

    public static void main(String[] args) {
        // 1、建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2、設定連接配接屬性
        factory.setHost("127.0.0.1");
        //端口可以不寫,預設是5672
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3、從連接配接工廠擷取連接配接
            connection = factory.newConnection("消費者");

            // 4、從連結中建立通道
            channel = connection.createChannel();

            /**
             * 5、聲明(建立)隊列
             * 如果隊列不存在,才會建立
             * RabbitMQ 不允許聲明兩個隊列名相同,屬性不同的隊列,否則會報錯
             *
             * queueDeclare參數說明:
             * @param queue 隊列名稱
             * @param durable 隊列是否持久化
             * @param exclusive 是否排他,即是否為私有的,如果為true,會對目前隊列加鎖,其它通道不能通路,
             *                  并且在連接配接關閉時會自動删除,不受持久化和自動删除的屬性控制。
             *                  一般在隊列和交換器綁定時使用
             * @param autoDelete 是否自動删除,當最後一個消費者斷開連接配接之後是否自動删除
             * @param arguments 隊列參數,設定隊列的有效期、消息最大長度、隊列中所有消息的生命周期等等
             */
            channel.queueDeclare("queue1", false, false, false, null);

            // 6、定義收到消息後的回調
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
                }
            };
            // 7、監聽隊列
            channel.basicConsume("queue1", true, callback, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                }
            });

            System.out.println("開始接收消息");
            System.in.read();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 8、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 9、關閉連接配接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           

此示例的生産者和消費者都隻是通過隊列來發送和接收消息,并未用到Exchange,

原理:手動通過channel.queueDeclare建立隊列時,背景會自動将這個隊列綁定到一個名稱為空的 Direct Exchange 上,綁定 RoutingKey 與隊列名稱相同。

是以在調用channel.basicPublish方法時,exchange傳的是空字元串,而routingKey傳的是隊列名。

有了這個預設的交換機和綁定,使我們隻關心隊列這一層即可,這個比較适合做一些簡單的應用。

在RabbitMQ的管理界面的Exchange中可以看到這個預設交換器:(AMQP default)

七、AMQP協定

1、簡介

AMQP(Advanced Message Queuing Protocol),是一個提供統一消息服務的标準進階消息隊列協定,是應用層協定的一個開放标準,為面向消息的中間件而設計。

2、AMQP結構

Rabbitmq基本概念與使用

3、AMQP生産者流轉過程

Rabbitmq基本概念與使用

4、AMQP消費者流轉過程

Rabbitmq基本概念與使用

八、rabbitmq核心概念

1、Producer

生産者,就是投遞消息的一方,生産者建立消息,然後釋出到rabbitmq中。

消息一般可以包含兩個部分:消息體和附件資訊。

(1)消息體(payload)

在實際應用中,消息體一般是一個帶有業務邏輯結構的資料

eg:一個JSON字元串,當然可以進一步對這個消息體進行序列化操作

(2)附加資訊

用來表述這條消息

eg:目标交換器的名稱、路由鍵、一些自定義屬性

2、Broker

消息中間件的服務節點

對于rabbitmq來說,一個rabbitmq Broker可以簡單地看作一個rabbitmq服務節點,或者rabbitmq服務執行個體。

也可以将一個rabbitmq Broker看作一台rabbitmq伺服器。

3、Virtual Host

虛拟主機,表示一批交換器、消息隊列和相關對象

虛拟主機是共享相同的身份認證和加密環境的獨立伺服器域

每個vhost本質上就是一個mini版的rabbitmq伺服器,擁有自己的隊列、交換器、綁定和權限機制

vhost是AMQP概念的基礎,必須在連接配接時指定,rabbitmq預設的vhost是/

4、Channel

頻道或信道,是建立在connection連接配接之上的一種輕量級的連接配接

大部分的操作是在Channel這個接口中完成的,包括定義隊列的聲明queueDeclare、交換機的聲明exchangeDeclare、隊列的綁定queueBind、釋出消息basicPublish、消費消息basicConsume等

如果把Connection比作一條光纖電纜的話,那麼Channel信道就比作成光纖電纜中的其中一束光纖

一個Connection上可以建立任意數量的Channel

5、RoutingKey

路由鍵:生産者将消息發給交換器的時候,一般會指定一個RoutingKey,用來指定這個消息的路由規則。

RoutingKey需要與交換器類型和綁定鍵(BindingKey)聯合使用

在交換器類型和綁定鍵(BindingKey)固定的情況下,生産者可以在發送消息給交換器時,通過指定RoutingKey來決定消息流向哪裡。

6、Exchange

交換器,生産者将消息發送到Exchange(交換器,通常也可以用大寫的“X”來表示),由交換器将消息路由到一個或者多個隊列中。如果路由不到,或傳回給生産者,或直接丢棄。

(1)預設的exchange:

根據消息指定的queue發送到指定的隊列

(2)Fanout:扇型交換機

它會把所有發送到該交換器的消息路由到所有與該交換器綁定的隊列中(廣播)

(3)Direct:直連交換機

它會把消息路由到那些BindingKey和RoutingKey完全比對的隊列中

(4)Topic:主題交換機

與direct類似,但它可以通過通配符進行模糊比對

【.】表示單詞的拆分方式 
【*】表示一個單詞的模糊比對【隻能是一個單詞,多個單詞就比對不上】
           如:路由鍵是*.apple.big則表示第一個單詞可以是任意的,隻要後邊單詞完全比對,就可以
【#】  表示完全模糊比對。
           如:路由鍵是#.little,那麼發送消息的路由鍵可以是green,apple,little,也就是說前面的單詞是任意的
           

(5)headers:頭交換機

不依賴于路由鍵的比對規則,在綁定queue與exchange時指定一組鍵值對,隻要消息頭中含有該“鍵值對”【鍵、值都要比對上】,就路由到對應的隊列

**`direct、topic相對來說,要靈活一些,headers性能很差,也不實用 `**
           

7、Queue

隊列,是rabbitmq的内部對象,用于存儲消息
           

8、Binding

綁定,rabbitmq中通過綁定将交換器與隊列關聯起來,在綁定的時候一般會指定一個綁定鍵(BindingKey),
這樣rabbitmq就知道如何正确地将消息路由到隊列了
           

9、Consumer

消費者,就是接收消息的一方

消費者連接配接到rabbitmq伺服器,并訂閱到隊列上

當消費者消費一條消息時,隻是消費消息的消息體。在消息路由的過程中,消息的标簽會丢棄,存入到隊列中的消息隻有消息體,
消費者也隻會消費到消息體,也就不知道消息的生産者是誰,當然消費者也不需要知道
           

九、運轉流程

1、整個消息的運轉流程

Rabbitmq基本概念與使用

2、RabbitMQ的運轉流程

Rabbitmq基本概念與使用

生産者發送消息的過程:

1.生産者連接配接到RabbitMQ Broker,履歷一個連接配接(Connection),開啟一個信道(Channel)
2.生産者聲明一個交換器,并設定相關屬性,比如交換機類型,是否持久化等
3.生産者聲明一個隊列并設定相關屬性,比如是否排他,是否持久化,是否自動删除等