大家好,又見面了,我是你們的朋友全棧君。
轉載了好幾篇關于mq的博文,但是總感覺對mq的了解使用都不到位。這裡打算從原理到使用都從頭來一遍。
1,原理
1.1通過類比了解mq
可以了解它是一個秘書,或是助手,你是老闆,你告訴秘書說你要開會,那麼秘書就會把開會的時間,地點,人員都安排好。你就省去了這些瑣事,這有點類似于sping的面向切面。
當添加一個商品時,商品服務隻需要告訴消息中間件MQ,MQ便去通知其它服務做各自該做的事情,比如通知搜尋服務去同步索引庫,通知redis服務去同步緩存,通知生成靜态頁面等等。
1.2常見的mq種類
mq的也被叫做中間件,種類有ActiveMQ,RabbitMQ,Kafka等,功能都差不多,這裡我們學習ActiveMQ.
1,3什麼是ActiveMQ?
它是Apache出品,最流行的,能力最強勁的開源消息總線。它完全支援JMS1.1和J2EE1.4規範的JMS Provider實作。
再者mq也可稱為分布式消息隊列,因為在mq的訂閱式中有多個消費者異步處理多個請求,這就已經達到了分布式處理的目的。
1.4特點
(1)多種語言和協定編寫用戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協定: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
(2) 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)
(3) 對Spring的支援,ActiveMQ可以很容易内嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性
(4)通過了常見J2EE伺服器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上
(5) 支援多種傳送協定:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
(6) 支援通過JDBC和journal提供高速的消息持久化
(7) 從設計上保證了高性能的叢集,用戶端-伺服器,點對點
(8)支援Ajax
(9)支援與Axis的整合
(10) 可以很容易得調用内嵌JMS provider,進行測試
ActiveMQ的消息形式
對于消息的傳遞有兩種類型:
一種是點對點的,即一個生産者和一個消費者一一對應;
另一種是釋出/訂閱模式,即一個生産者産生消息并進行發送後,可以由多個消費者進行接收。
JMS定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送并接收以一些不同形式的資料,提供現有消息格式的一些級别的相容性。我們用的最多的也就是TextMessage而已。
· StreamMessage — Java原始值的資料流
· MapMessage–一套名稱-值對
· TextMessage–一個字元串對象
· ObjectMessage–一個序列化的 Java對象
· BytesMessage–一個位元組的資料流
我們可以通過下面一張圖來加深了解,圖上半部分是”釋出/訂閱者”模式,兩個釋出者各自釋出了一條消息,每條消息都可以被多個Consumer接收到。圖下半部分是”面對面”模式,兩個釋出者各自釋出了一條消息,壓入隊列當中,隊列的特點是先進先出,一旦有某個消費者拿走了一條消息,隊列中就少了一條消息,剩下的消費者就不可能再消費那條消息了,是以也就做到了一對一。
二 安裝ActiveMQ
我這裡把mq安裝在虛拟機上,當然虛拟機要能上網,還有jdk啥的這裡就不說了。下載下傳就到apache官網下載下傳,位址:
http://activemq.apache.org/activemq-5112-release.html
找到如下圖位置,點選下載下傳:
下載下傳好後上傳,解壓,
完了後我們到active目錄下看到如下内容:
我們可以看到有一個名為activemq-all-5.12.0.jar的jar包,這個jar包,如果不與spring結合,隻是簡單用來當做activemq用戶端的話,可以使用。如果要将activemq與spring整合的話,不要使用這個jar包,因為這個jar包當中包含了spring的包結構,而且裡面的類與spring裡面的類名稱是一樣的,但是方法不全,當我們将spring和activemq結合的時候,如果系統使用的是activemq的jar包當中的spring的類的話就會報錯,啟動都啟動不了,而且錯誤還隐藏的特别深,難以捉摸其原因。是以整合的話,不要用這個jar包!!!activemq有一個版本5.11.2,裡面沒有spring的包結構,我們可以使用。
我們看下bin目錄下的檔案清單,如下圖所示,其中activemq檔案是用來啟動activemq的。
conf目錄存放的是一些配置檔案,我們不用動,data目錄存放的是服務端的緩存資料
webapps提供了管理的背景,如下所示。
3,不用做改動,直接啟動mq
xiaoye@ubuntu3:~$ ./activemq/bin/activemq start
INFO: Loading ‘/home/xiaoye/activemq/bin/env’
INFO: Using java ‘/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java’
INFO: Starting – inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : ‘/home/xiaoye/activemq/data/activemq.pid’ (pid ‘1699’)
這樣就啟動成功了;
web通路一下,預設端口是8161
上面這個界面是沒有使用者登入的界面。下面我們用admin登入,預設的賬号和密碼都是admin
http://192.168.72.133:8161/admin/
打開這個連結會彈出輸入賬号密碼的框,填進去就行了。
點選Quenes如下,這個是點對點消息發送界面
再點選topic是釋出/訂閱模式界面
在Send中可以測試發送點對點或釋出/訂閱兩種消息,如下圖所示。
三,代碼測試ActiveMQ
下面我們要寫java代碼測試了。
建立一個maven工程。打開eclipse ,右鍵建立maven project –》
finish
修改pom.xml添加maven依賴,依賴我們到apeche,maven官網去找。可直接百度關鍵詞,active maven,圖下:
點開找到,我們下載下傳active版本
把這個maven依賴拷過來:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
複制
這樣maven就能夠自動幫我們下載下傳active的jar包了。
下面建立一個junit測試類:
finish即可
在類中加入一下内容做簡單測試,報錯,我們
如下,滑鼠防止@Test上,給提示導入Junit包,導入後,就沒問題了。右鍵運作也是OK的。
下面我們就來寫測試類,先來測試queue點對點的消息發送方式:
package com.xiaoyexinxin.activeMQTest;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import junit.framework.TestCase;
/**
*
* @author liuxin
* @date 2018年4月10日
*/
public class TestActiveMq extends TestCase {
@Test
public void testQueueProducer() throws JMSException{
//建立一個連結工廠connectionFactory對象,需要指定mq服務ip和端口,注意brokerURL的開頭是
//tcp://而不是我們通常的http://,端口是61616而不是我們通路activemq背景管理頁面所使用的8161
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//使用connectionFactory 連結一個connection對象
Connection connection=connectionFactory.createConnection();
//開啟連結,調用connection 對象的start方法
connection.start();
//使用connection 建立一個session對象
//第一個參數是是否開啟事務,一般不使用分布式事務,因為它特别消耗性能,而且顧客體驗特别差,現在網際網路的
//做法是保證資料的最終一緻(也就是允許暫時資料不一緻),比如顧客下單購買東西,一旦訂單生成完就立刻響應給使用者
//下單成功。至于下單後一系列的操作,比如通知會計記賬、通知物流發貨、商品數量同步等等都先不用管,隻需要
//發送一條消息到消息隊列,消息隊列來告知各子產品進行相應的操作,一次告知不行就兩次,直到完成所有相關操作為止,這
//也就做到了資料的最終一緻性。如果第一個參數為true,那麼第二個參數将會被忽略掉。如果第一個參數為false,那麼
//第二個參數為消息的應答模式,常見的有手動和自動兩種模式,我們一般使用自動模式。
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用session對象建立一個Destination(目标)對象,兩站形式,queue,topic兩種,這裡我們使用queue
//參數就是消息隊列的名稱
Queue queue=session.createQueue("test-queue");
//建立生成者,
MessageProducer producer=session.createProducer(queue);
//建立消息内容
//有兩種方式,第一種方式:
// TextMessage textMessage = new ActiveMQTextMessage();
// textMessage.setText("hello,activemq!!!");
//第二種方式:
TextMessage textMessage = session.createTextMessage("hello,activemq!!");
//發送消息
producer.send(textMessage);
//關閉資源,由内而外的關閉
producer.close();
session.close();
connection.close();
}
}
複制
右鍵運作程式,成功後,在ActiveMQ的背景管理系統,點選”Queues”,可以看到我們剛才發送的那條消息”test-queue”。我們點選”test-queue”
點選test-queue,如下:Persistence為永久儲存,priority優先級是4 ,Redelivered是否重複投遞消息,這裡是否,
接着點選,長串的id
儲存了,打開日志看看是啥錯;
xiaoye@ubuntu3:~/activemq cd data xiaoye@ubuntu3:~/activemq/data ls activemq.log activemq.pid audit.log kahadb
xiaoye@ubuntu3:~/activemq/data$ tail -200 activemq.log
看到日志有下圖的錯:
這個是activemq不支援jdk1.8造成的,這裡我把虛拟機的jdk換成1.7的試試
官網下載下傳後上傳,我這裡用的是谷歌浏覽器下載下傳,谷歌浏覽器下載下傳的jdk不知道為何,應該是tar.gz結尾的jdk,卻隻有gz.于是百度一圈,直接修改壓縮包字尾,改為.tar結尾解壓
xiaoye@ubuntu3:~/activemq/data cd ~ xiaoye@ubuntu3:~ cd Downloads/ xiaoye@ubuntu3:~/Downloads
xiaoye@ubuntu3:~/Downloads$ ls
apache-activemq-5.11.2-bin.tar.gz apache-activemq-5.15.3-bin.tar.gz hbase-1.0.0-cdh5.5.1.tar.gz jdk-7u80-linux-x64.tar.gz
apache-activemq-5.12.0-bin.tar.gz hadoop-2.5.0-cdh5.2.0.tar.gz hive-0.13.1-cdh5.2.0.tar.gz sqoop-1.4.6-cdh5.5.4.tar.gz
解壓到目前目錄。
xiaoye@ubuntu3:~ cd Downloads/ xiaoye@ubuntu3:~/Downloads ls apache-activemq-5.11.2-bin.tar.gz hive-0.13.1-cdh5.2.0.tar.gz apache-activemq-5.12.0-bin.tar.gz jdk1.7.0_79 apache-activemq-5.15.3-bin.tar.gz jdk-7u79-linux-x64.tar.gz hadoop-2.5.0-cdh5.2.0.tar.gz sqoop-1.4.6-cdh5.5.4.tar.gz
hbase-1.0.0-cdh5.5.1.tar.gz
下面設定環境變量。
切換到root使用者。
root@ubuntu3:~# vim /etc/profile
export JAVA_HOME=/home/xiaoye/Downloads/jdk1.7.0_79
修改儲存後,soruce /etc/profile
這個改完後,在修改目前使用者下的環境變量。
xiaoye@ubuntu3:~$ vim .bashrc
export JAVA_HOME=/home/xiaoye/Downloads/jdk1.7.0_79
export CLASSPATH=${JAVA_HOME}/lib
export PATH={JAVA_HOME}/bin:PATH
修改儲存後,source .bashrc
啟動activemq
xiaoye@ubuntu3:~$ ./activemq/bin/activemq start
INFO: Loading ‘/home/xiaoye/activemq/bin/env’
INFO: Using java ‘/home/xiaoye/Downloads/jdk1.7.0_79/bin/java’
INFO: Process with pid ‘1454’ is already running
顯示還在運作,那就kill -9 1454
再次重新開機,就好了。
這樣在點選ID的時候就不會報錯了。
四,消費者
下面我們寫消費者方法,寫在同生産者一個類裡面;
内容如下:
/*
* 消費者
*/
@Test
public void testQueueConsumer() throws Exception{
//跟建立生産者一樣,先連接配接mq
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//連接配接一個connection 對象
Connection connection=connectionFactory.createConnection();
//開啟連結
connection.start();
//建立一個session會話對象
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用session對象建立一個Destination對象,兩種形式queue,topic ,這裡我們使用queue
//參數是消息隊列的名稱
Queue queue=session.createQueue("test-queue");
//使用session建立一個consumer對象
MessageConsumer consumer=session.createConsumer(queue);
//向Consumer對象中設定一個Messagelistener對象,用來接受消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
// TODO Auto-generated method stub
if(arg0 instanceof TextMessage){
TextMessage textMessage=(TextMessage) arg0;
String text;
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//程式等待接收使用者結束操作
//程式自己并不知道什麼時候有消息,也不知道什麼時候不再發送消息了,這就需要手動幹預,
//當我們想停止接收消息時,可以在控制台輸入任意鍵,然後回車即可結束接收操作(也可以直接按回車)。
System.in.read();
//關閉資源
consumer.close();
session.close();
connection.close();
}
複制
右鍵方法運作消費者方法,會在控制台看到生産者發送的hello world消息,如下:
這裡執行消費者方法後并沒有停止運作,還在等待新新的消息進來,那麼我們右鍵生産者方法再次運作會發現有兩個hello輸出。
我們修改一下生産者内容,再次運作。
發現有一個語句輸出,說明沒有問題。
我們到activeMQ背景管理頁面看看
說一下這幾個标簽的含義,number of pending messages 待發送消息數
Number Of Consumers 消費者消息數
Messages Enqueued 壓入隊列的消息數量
Messages Dequeued 出隊列的消息數量,也就是被消費的消息數
五,topic 釋出/訂閱模式
生産者代碼:
/*
* 訂閱模式的生産者
*/
@Test
public void testTopicProducer() throws JMSException{
//建立工程連接配接mq
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//使用工程建立一個連接配接對象
Connection connection=connectionFactory.createConnection();
//開啟連結
connection.start();
//使用連結建立一個會話
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用會話建立一個目标對象
Topic topic=session.createTopic("test-topic");
//建立一個生産者
MessageProducer producer=session.createProducer(topic);
//7.建立一個TextMessage對象 ,并寫入要傳輸的消息内容
//有兩種方式,第一種方式:
// TextMessage textMessage = new ActiveMQTextMessage();
// textMessage.setText("hello,activemq!!!");
//第二種方式:
TextMessage textMessage=session.createTextMessage("hello topic!");
//發送消息
producer.send(textMessage);
//關閉資源
producer.close();
session.close();
connection.close();
}
複制
運作上面的測試方法,運作成功後,我們通路activemq的管理背景頁面,點選”Topics”,可以看到有”test-topic”這一行,壓入消息隊列一條消息,但由于沒有消費者,是以沒有消費掉該消息。
點開test-topic發現:消息體裡并沒有我們發送的内容。
而queue就不同,queue有持久化一欄,發送的消息會被儲存下來。這樣的話,就會有個問題,那就是如果發送topic消息時沒有消費者,那麼這條消息便不存在了,不會再被消費了。是以我們要想消息不會被遺失掉,我們要先打開消費者,然後再發送topic消息。
我們來寫消費topic消息的方法,如下圖所示,該方法與我們上面學習的消費隊列消息的方法不同的是建立Destination的時候不一樣,同時為了模拟多個消費者,在該方法中添加一條輸出資訊,标明該方法是第幾個消費者。
消費者代碼:
/*
* 訂閱模式的消費者
*/
@Test
public void testTopicConsumer() throws JMSException{
//建立工廠,連結mq
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//使用工程建立一個連結
Connection connection=connectionFactory.createConnection();
//打開連結
connection.start();
//使用連結建立一個會話
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立消息隊列
Topic topic=session.createTopic("test-topic");
//消費者
MessageConsumer consumer=session.createConsumer(topic);
//消費者設定監聽器,監聽傳來的消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
// TODO Auto-generated method stub
if(arg0 instanceof TextMessage){
TextMessage textMessage=(TextMessage) arg0;
try {
String text=textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//程式等待接收使用者結束操作
//程式自己并不知道什麼時候有消息,也不知道什麼時候不再發送消息了,這就需要手動幹預,
//當我們想停止接收消息時,可以在控制台輸入任意鍵,然後回車即可結束接收操作(也可以直接按回車)。
System.out.println("topic消費者1111。。。。。");
try {
System.in.read();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//9.關閉資源
consumer.close();
session.close();
connection.close();
}
複制
右鍵運作消費者方法:
修改為2222.。。。。。
再次運作消費者方法。
修改為33333.。。。。
再次運作消費者方法
發現這裡消費者的數量是3 了
啟動了三個消費者後,我們再發送一次topic消息,發完之後,我們看各個控制台的資訊。如下圖所示。可以看到都列印出了我們發送的topic資訊。
三個程序控制台都有列印生産者消息。
六,topic消息持久化
topic消息沒有持久化,也就意味着,如果消息發送者發送消息的時候,如果消費者沒有運作的話,它将無法消費這個消息了(即使它啟動也無法再接收到那條topic消息了),這樣問題就來了,如果那條消息非常重要呢?我們不能容忍接收不到消息的情況。
生産者代碼:
/*
* 訂閱釋出式 可持久化生産者
*/
@Test
public void TestTopicPersistenceProducer() throws JMSException{
//建立工程連接配接mq
ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//設定異步發送消息可顯著提高發送性能
connectionFactory.setUseAsyncSend(true);
//使用工程建立一個連接配接對象
Connection connection=connectionFactory.createConnection();
//對每個生産者來說其clientID值必須唯一
connection.setClientID("producerTopic2");
//開啟連結
connection.start();
//使用連結建立一個會話
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用會話建立一個目标對象
Topic topic=session.createTopic("test-topic");
//建立一個生産者
MessageProducer producer=session.createProducer(topic);
//DelieveryMode設定為PERSISTENCE(持久化)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//建立一個TextMessage對象 ,并寫入要傳輸的消息内容
//有兩種方式,第一種方式:
// TextMessage textMessage = new ActiveMQTextMessage();
// textMessage.setText("hello,activemq!!!");
//第二種方式:
TextMessage textMessage=session.createTextMessage("hello topic!persistence2");
//發送消息
producer.send(textMessage);
//關閉資源
producer.close();
session.close();
connection.close();
}
複制
消費者代碼:
/*
* 訂閱釋出式 可持久化消費者
*/
@Test
public void TestTopicPersistenceConsumer() throws JMSException{
//建立工廠,連結mq
ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
//設定異步接受消息,可提高接受性能
connectionFactory.setUseAsyncSend(true);
//使用工程建立一個連結
Connection connection=connectionFactory.createConnection();
//設定每個消費者id,每個都要不同
connection.setClientID("consumer1");
//打開連結
connection.start();
//使用連結建立一個會話
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立消息隊列
Topic topic=session.createTopic("test-topic");
//消費者
MessageConsumer consumer=session.createDurableSubscriber(topic, "consumer1");
//消費者設定監聽器,監聽傳來的消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
// TODO Auto-generated method stub
if(arg0 instanceof TextMessage){
TextMessage textMessage=(TextMessage) arg0;
try {
String text=textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//程式等待接收使用者結束操作
//程式自己并不知道什麼時候有消息,也不知道什麼時候不再發送消息了,這就需要手動幹預,
//當我們想停止接收消息時,可以在控制台輸入任意鍵,然後回車即可結束接收操作(也可以直接按回車)。
System.out.println("topic消費者3333。。。。。");
try {
System.in.read();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//9.關閉資源
consumer.close();
session.close();
connection.close();
}
複制
我們還需要配置下activemq的activemq.xml檔案,隻需要添加一句配置,就是在<broker的末尾添加一句關于持久化的配置persistent=”true”即可。如下:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="true">
複制
然後重新啟動mq;
這樣設定持久化了就無所謂哪個先啟動了。
釋出者:全棧程式員棧長,轉載請注明出處:https://javaforall.cn/106159.html原文連結:https://javaforall.cn