天天看點

java連接配接rabbitmq_沒用過消息隊列?一文帶你體驗RabbitMQ收發消息楔子1. 消息隊列?2. RabbitMQ一覽3. RabbitMQ環境4. ✍Hello World4.1 生産者4.2 消費者5. 消息接收确認(ACK)後記

楔子

先給大家說聲抱歉,最近一周都沒有發文,有一些比較要緊重要的事需要處理。

今天正好得空,本來說準備寫SpringIOC相關的東西,但是發現想要梳理一遍,還是需要很多時間,是以我打算慢慢寫,先把MQ給寫了,再慢慢寫其他相關的,畢竟偏理論的東西一遍要比較難寫,像MQ這種偏實戰的大家可以clone代碼去玩一玩,還是比較友善的。

同時MQ也是Java進階不必可少的技術棧之一,是以Java開發從業者對它是必須要了解的。

現在市面上有三種消息隊列比較火分别是:RabbitMQ,RocketMQ和Kafka。

今天要講的消息隊列中我會以RabbitMQ作為案例來入門,因為SpringBoot的amqp中預設隻內建了RabbitMQ,用它來講會友善許多,且RabbitMQ的性能和穩定性都很不錯,是一款經過時間考驗的開源元件。

祝有好收獲。

本文代碼: 碼雲位址 GitHub位址

1. 消息隊列?

消息隊列(MQ)全稱為Message Queue,是一種應用程式對應用程式的通信方法。

翻譯一下就是:在應用之間放一個消息元件,然後應用雙方通過這個消息元件進行通信。

好端端的為啥要在中間放個元件呢?

小系統其實是用不到消息隊列的,一般分布式系統才會引入消息隊列,因為分布式系統需要抗住高并發,需要多系統解耦,更需要對使用者比較友好的響應速度,而消息隊列的特性可以天然解耦,友善異步更能起到一個頂住高并發的削峰作用,完美解決上面的三個問題。

然萬物抱陽負陰,系統之間突然加了個中間件,提高系統複雜度的同時也增加了很多問題:

  • 消息丢失怎麼辦?
  • 消息重複消費怎麼辦?
  • 某些任務需要消息的順序消息,順序消費怎麼保證?
  • 消息隊列元件的可用性如何保證?

這些都是使用消息隊列過程中需要思考需要考慮的地方,消息隊列能給你帶來很大的便利,也能給你帶來一些對應的麻煩。

上面說了消息隊列帶來的好處以及問題,而這些不在我們今天這篇的讨論範圍之内,我打算之後再寫這些,我們今天要做的是搭建出一個消息隊列環境,讓大家感受一下基礎的發消息與消費消息,更進階的問題會放在以後讨論。

2. RabbitMQ一覽

RabbitMQ是一個消息元件,是一個erlang開發的AMQP(Advanced Message Queue)的開源實作。

AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層标準進階消息隊列協定,是應用層協定的一個開放标準,為面向消息的中間件設計。

RabbitMQ采用了AMQP協定,至于這協定怎麼怎麼樣,我們關心的是RabbitMQ結構如何且怎麼用。

還是那句話,學東西需要先觀其大貌,我們要用RabbitMQ首先要知道它整體是怎麼樣,這樣才有利于我們接下來的學習。

我們先來看看我剛畫的架構圖,因為RabbitMQ實作了AMQP協定,是以這些概念也是AMQP中共有的。

java連接配接rabbitmq_沒用過消息隊列?一文帶你體驗RabbitMQ收發消息楔子1. 消息隊列?2. RabbitMQ一覽3. RabbitMQ環境4. ✍Hello World4.1 生産者4.2 消費者5. 消息接收确認(ACK)後記
  • Broker: 中間件本身。接收和分發消息的應用,這裡指的就是RabbitMQ Server。
  • Virtual host: 虛拟主機。出于多租戶和安全因素設計的,把AMQP的基本元件劃分到一個虛拟的分組中,類似于網絡中的namespace概念。當多個不同的使用者使用同一個RabbitMQ server提供的服務時,可以劃分出多個vhost,每個使用者在自己的vhost建立exchange/queue等。
  • Connection: 連接配接。publisher/consumer和broker之間的TCP連接配接。斷開連接配接的操作隻會在client端進行,Broker不會斷開連接配接,除非出現網絡故障或broker服務出現問題。
  • Channel: 管道。如果每一次通路RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷會比較大且效率也較低。Channel是在connection内部建立的邏輯連接配接,如果應用程式支援多線程,通常每個thread建立單獨的channel進行通訊,AMQP method包含了channel id幫助用戶端和message broker識别channel,是以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了作業系統建立TCP connection的開銷。
  • Exchange: 路由。根據分發規則,比對查詢表中的routing key,分發消息到queue中去。
  • Queue: 消息的隊列。消息最終被送到這裡等待消費,一個message可以被同時拷貝到多個queue中。
  • Binding: 綁定。exchange和queue之間的虛拟連接配接,binding中可以包含routing key。Binding資訊被儲存到exchange中的查詢表中,用于message的分發依據。

看完了這些概念,我再給大家梳理一遍其流程:

當我們的生産者端往Broker(RabbitMQ)中發送了一條消息,Broker會根據其消息的辨別送往不同的Virtual host,然後Exchange會根據消息的路由key和交換器類型将消息分發到自己所屬的Queue中去。

然後消費者端會通過Connection中的Channel擷取剛剛推送的消息,拉取消息進行消費。

Tip:某個Exchange有哪些屬于自己的Queue,是由Binding綁定關系決定的。

3. RabbitMQ環境

上面講了RabbitMQ大概的結構圖和一個消息的運作流程,講完了理論,這裡我們就準備實操一下吧,先進行RabbitMQ安裝。

官網下載下傳位址:www.rabbitmq.com/download.ht…

由于我還沒有屬于自己MAC電腦,是以這裡的示範就按照Windows的來了,不過大家都是程式員,安裝個東西總歸是難不倒大家的吧

Windows下載下傳位址:www.rabbitmq.com/install-win…

進去之後可以直接找到Direct Downloads,下載下傳相關EXE程式進行安裝就可以了。

由于RabbitMQ是由erlang語言編寫的,是以安裝之前我們還需要安裝erlang環境,你下載下傳RabbitMQ之後直接點選安裝,如果沒有相關環境,安裝程式會提示你,然後會讓你的浏覽器打開erlang的下載下傳頁面,在這個頁面上根據自己的系統類型點選下載下傳安裝即可,安裝完畢後再去安裝RabbitMQ。

這兩者的安裝都隻需要一直NEXT下一步就可以了。

安裝完成之後可以按一下Windows鍵看到效果如下:

java連接配接rabbitmq_沒用過消息隊列?一文帶你體驗RabbitMQ收發消息楔子1. 消息隊列?2. RabbitMQ一覽3. RabbitMQ環境4. ✍Hello World4.1 生産者4.2 消費者5. 消息接收确認(ACK)後記

Tip:其中Rabbit-Command後面會用到,是RabbitMQ的指令行操作台。

安裝完RabbitMQ我們需要對我們的開發環境也導入RabbitMQ相關的JAR包。

為了友善起見,我們可以直接使用Spring-boot-start的方式導入,這裡面也會包含所有我們需要用到的RabbitMQ相關的JAR包。

org.springframework.boot            spring-boot-starter-amqp        
           

直接引入spring-boot-starter-amqp即可。

4. ✍Hello World

搭建好環境之後,我們就可以上手了。

考慮到這是一個入門文章,讀者很多可能沒有接觸過RabbitMQ,直接使用自動配置的方式可能會令大家很迷惑,因為自動配置會屏蔽很多細節,導緻大家隻看到了被封裝後的樣子,不利于大家了解。

是以在本節Hello World這裡,我會直接使用最原始的連接配接方式進行示範,讓大家看到最原始的連接配接的樣子。

Tip:這種方式示範的代碼我都在放在prototype包下面。

4.1 生産者

先來看看生産者代碼,也就是我們push消息的代碼:

public static final String QUEUE_NAME = "erduo";    // 建立連接配接工廠    ConnectionFactory connectionFactory = new ConnectionFactory();    // 連接配接到本地server    connectionFactory.setHost("127.0.0.1");    // 通過連接配接工廠建立連接配接    Connection connection = connectionFactory.newConnection();    // 通過連接配接建立通道    Channel channel = connection.createChannel();    // 建立一個名為耳朵的隊列,該隊列非持久(RabbitMQ重新開機後會消失)、非獨占(非僅用于此連結)、非自動删除(伺服器将不再使用的隊列删除)    channel.queueDeclare(QUEUE_NAME, false, false, false, null);    String msg = "hello, 我是耳朵。" + LocalDateTime.now().toString();    // 釋出消息    // 四個參數為:指定路由器,指定key,指定參數,和二進制資料内容    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));    System.out.println("生産者發送消息結束,發送内容為:" + msg);    channel.close();    connection.close();
           

代碼我都給了注釋,但是我還是要給大家講解一遍,梳理一下。

先通過RabbitMQ中的ConnectionFactory配置一下将要連接配接的server-host,然後建立一個新連接配接,再通過此連接配接建立通道(Channel),通過這個通道建立隊列和發送消息。

這裡看上去還是很好了解的,我需要把建立隊列和發送消息這裡再拎出來說一下。

建立隊列

AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException;
           

建立隊列的方法裡面有五個參數,第一個是參數是隊列的名稱,往後的三個參數代表不同的配置,最後一個參數是額外參數。

  • durable:代表是否将此隊列持久化。
  • exclusive:代表是否獨占,如果設定為獨占隊列則此隊列僅對首次聲明它的連接配接可見,并在連接配接斷開時自動删除。
  • autoDelete:代表斷開連接配接後是否自動删除此隊列。
  • arguments:代表其他額外參數。

這些參數中durable經常會用到,它代表了我們可以對隊列做持久化,以保證RabbitMQ當機恢複後此隊列也可以自行恢複。

發送消息

void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
           

發送消息的方法裡是四個參數,第一個是必須的指定exchange,上面的示例代碼中我們傳入了一個空字元串,這代表我們交由預設的匿名exchange去幫我們路由消息。

第二個參數是路由key,exchange會根據此key對消息進行路由轉發,第三個參數是額外參數,講消息持久化時會用到一下,最後一個參數就是我們要發送的資料了,需要将我們的資料轉成位元組數組的方式傳入。

測試

講完了這些API之後,我們可以測試一下我們的代碼了,run一下之後,會在控制台打出如下:

java連接配接rabbitmq_沒用過消息隊列?一文帶你體驗RabbitMQ收發消息楔子1. 消息隊列?2. RabbitMQ一覽3. RabbitMQ環境4. ✍Hello World4.1 生産者4.2 消費者5. 消息接收确認(ACK)後記

這樣之後我們就把消息發送到了RabbitMQ中去,此時可以打開RabbitMQ控制台(前文安裝時提到過)去使用指令rabbitmqctl.bat list_queues去檢視消息隊列現在的情況:

java連接配接rabbitmq_沒用過消息隊列?一文帶你體驗RabbitMQ收發消息楔子1. 消息隊列?2. RabbitMQ一覽3. RabbitMQ環境4. ✍Hello World4.1 生産者4.2 消費者5. 消息接收确認(ACK)後記

可以看到有一條message在裡面,這就代表我們的消息已經發送成功了,接下來我們可以編寫一個消費者對裡面的message進行消費了。

4.2 消費者

消費者代碼和生産者的差不多,都需要建立連接配接建立通道:

// 建立連接配接工廠    ConnectionFactory connectionFactory = new ConnectionFactory();    // 連接配接到本地server    connectionFactory.setHost("127.0.0.1");    // 通過連接配接工廠建立連接配接    Connection connection = connectionFactory.newConnection();    // 通過連接配接建立通道    Channel channel = connection.createChannel();    // 建立消費者,阻塞接收消息    com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {        @Override        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {            System.out.println("-------------------------------------------");            System.out.println("consumerTag : " + consumerTag);            System.out.println("exchangeName : " + envelope.getExchange());            System.out.println("routingKey : " + envelope.getRoutingKey());            String msg = new String(body, StandardCharsets.UTF_8);            System.out.println("消息内容 : " + msg);        }    };    // 啟動消費者消費指定隊列    channel.basicConsume(Producer.QUEUE_NAME, consumer);//        channel.close();//        connection.close();
           

建立完通道之後,我們需要建立一個消費者對象,然後用這個消費者對象去消費指定隊列中的消息。

這個示例中我們就是建立了一個consumer,然後用它去消費隊列-erduo中的消息。

最後兩句代碼我給注釋掉了,因為一旦把連接配接也關閉了,那我們的消費者就不能保持消費狀态了,是以要開着連接配接,監聽此隊列。

ok,運作這段程式,然後我們的消費者會去隊列-erduo拿到裡面的消息,效果如下:

java連接配接rabbitmq_沒用過消息隊列?一文帶你體驗RabbitMQ收發消息楔子1. 消息隊列?2. RabbitMQ一覽3. RabbitMQ環境4. ✍Hello World4.1 生産者4.2 消費者5. 消息接收确認(ACK)後記
  • consumerTag:是這個消息的辨別。
  • exchangeName:是這個消息所發送exchange的名字,我們先前傳入的是空字元串,是以這裡也是空字元串。
  • exchangeName:是這個消息所發送路由key。

這樣我們的程式就處在一個監聽的狀态下,你再次調用生産者發送消息消費者就會實時的在控制上列印消息内容。

5. 消息接收确認(ACK)

上面我們示範了生産者和消費者,我們生産者發送一條消息,消費者消費一條資訊,這個時候我們的RabbitMQ應該有多少消息?

理論上來說發送一條,消費一條,現在裡面應該是0才對,但是現在的情況并不是:

java連接配接rabbitmq_沒用過消息隊列?一文帶你體驗RabbitMQ收發消息楔子1. 消息隊列?2. RabbitMQ一覽3. RabbitMQ環境4. ✍Hello World4.1 生産者4.2 消費者5. 消息接收确認(ACK)後記

消息隊列裡面還是有1條資訊,我們重新開機一下消費者,又列印了一遍我們消費過的那條消息,通過消息上面的時間我們可以看出來還是當時我們發送的那條資訊,也就是說我們消費者消費過了之後這條資訊并沒有被删除。

java連接配接rabbitmq_沒用過消息隊列?一文帶你體驗RabbitMQ收發消息楔子1. 消息隊列?2. RabbitMQ一覽3. RabbitMQ環境4. ✍Hello World4.1 生産者4.2 消費者5. 消息接收确認(ACK)後記

這種狀況出現的原因是因為RabbitMQ消息接收确認機制,也就是說一條資訊被消費者接收到了之後,需要進行一次确認操作,這條消息才會被删除。

RabbitMQ中預設消費确認是手動的,也可以将其設定為自動删除,自動删除模式消費者接收到消息之後就會自動删除這條消息,如果消息處理過程中發生了異常,這條消息就等于沒被處理完但是也被删除掉了,是以這裡我們會一直使用手動确認模式。

消息接受确認(ACK)的代碼很簡單,隻要在原來消費者的代碼裡加上一句就可以了:

com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {        @Override        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {            System.out.println("-------------------------------------------");            System.out.println("consumerTag : " + consumerTag);            System.out.println("exchangeName : " + envelope.getExchange());            System.out.println("routingKey : " + envelope.getRoutingKey());            String msg = new String(body, StandardCharsets.UTF_8);            System.out.println("消息内容 : " + msg);            // 消息确認            channel.basicAck(envelope.getDeliveryTag(), false);            System.out.println("消息已确認");        }    };
           

我們将代碼改成如此之後,可以再run一次消費者,可以看看效果:

java連接配接rabbitmq_沒用過消息隊列?一文帶你體驗RabbitMQ收發消息楔子1. 消息隊列?2. RabbitMQ一覽3. RabbitMQ環境4. ✍Hello World4.1 生産者4.2 消費者5. 消息接收确認(ACK)後記

再來看看RabbitMQ中的隊列情況:

java連接配接rabbitmq_沒用過消息隊列?一文帶你體驗RabbitMQ收發消息楔子1. 消息隊列?2. RabbitMQ一覽3. RabbitMQ環境4. ✍Hello World4.1 生産者4.2 消費者5. 消息接收确認(ACK)後記

從圖中我們可以看出消息消費後已經成功被删除了,其實大膽猜一猜,自動删除應該是在我們的代碼還沒執行之前就幫我們傳回了确認,是以這就導緻了消息丢失的可能性。

我們采用手動确認的方式之後,可以先将邏輯處理完畢之後(可能出現異常的地方可以try-catch起來),把手動确認的代碼放到最後一行,這樣如果出現異常情況導緻這條消息沒有被确認,那麼這條消息會在之後被重新消費一遍。

後記

今天的内容就到這裡,下一篇将會我們将會撇棄傳統的手動建立連接配接的方式進行發消息收消息,而轉用Spring幫我們定義好的注解和Spring提供的RabbitTemplate,更友善的收發消息。

消息隊列呢,其實用法都是一樣的,隻是各個開源消息隊列的側重點稍有不同,我們應該根據我們自己的項目需求來決定我們應該選取什麼樣的消息隊列來為我們的項目服務,這個項目選型的工作一般都是開發組長幫你們做了,一般是輪不到我們來做的,但是面試的時候可能會考察相關知識,是以這幾種消息隊列我們都應該有所涉獵。

好了,以上就是本期的全部内容,感謝你能看到這裡,歡迎對本文點贊收藏與評論,你們的每個點贊都是我創作的最大動力。

作者:和耳朵

連結:https://juejin.im/post/6856571028496351239

來源:掘金