天天看點

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

大家好,又見面了,我是你們的朋友全棧君。

轉載了好幾篇關于mq的博文,但是總感覺對mq的了解使用都不到位。這裡打算從原理到使用都從頭來一遍。

1,原理

1.1通過類比了解mq

可以了解它是一個秘書,或是助手,你是老闆,你告訴秘書說你要開會,那麼秘書就會把開會的時間,地點,人員都安排好。你就省去了這些瑣事,這有點類似于sping的面向切面。

當添加一個商品時,商品服務隻需要告訴消息中間件MQ,MQ便去通知其它服務做各自該做的事情,比如通知搜尋服務去同步索引庫,通知redis服務去同步緩存,通知生成靜态頁面等等。

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

1.2常見的mq種類

mq的也被叫做中間件,種類有ActiveMQ,RabbitMQ,Kafka等,功能都差不多,這裡我們學習ActiveMQ.

1,3什麼是ActiveMQ?

它是Apache出品,最流行的,能力最強勁的開源消息總線。它完全支援JMS1.1和J2EE1.4規範的JMS Provider實作。

再者mq也可稱為分布式消息隊列,因為在mq的訂閱式中有多個消費者異步處理多個請求,這就已經達到了分布式處理的目的。

1.4特點

(1)多種語言和協定編寫用戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協定: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

(2) 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)

(3) 對Spring的支援,ActiveMQ可以很容易内嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性

(4)通過了常見J2EE伺服器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上

(5) 支援多種傳送協定:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

(6) 支援通過JDBC和journal提供高速的消息持久化

(7) 從設計上保證了高性能的叢集,用戶端-伺服器,點對點

(8)支援Ajax

(9)支援與Axis的整合

(10) 可以很容易得調用内嵌JMS provider,進行測試

ActiveMQ的消息形式

對于消息的傳遞有兩種類型:

一種是點對點的,即一個生産者和一個消費者一一對應;

另一種是釋出/訂閱模式,即一個生産者産生消息并進行發送後,可以由多個消費者進行接收。

JMS定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送并接收以一些不同形式的資料,提供現有消息格式的一些級别的相容性。我們用的最多的也就是TextMessage而已。

  · StreamMessage — Java原始值的資料流

  · MapMessage–一套名稱-值對

  · TextMessage–一個字元串對象

  · ObjectMessage–一個序列化的 Java對象

  · BytesMessage–一個位元組的資料流

我們可以通過下面一張圖來加深了解,圖上半部分是”釋出/訂閱者”模式,兩個釋出者各自釋出了一條消息,每條消息都可以被多個Consumer接收到。圖下半部分是”面對面”模式,兩個釋出者各自釋出了一條消息,壓入隊列當中,隊列的特點是先進先出,一旦有某個消費者拿走了一條消息,隊列中就少了一條消息,剩下的消費者就不可能再消費那條消息了,是以也就做到了一對一。

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

二 安裝ActiveMQ

我這裡把mq安裝在虛拟機上,當然虛拟機要能上網,還有jdk啥的這裡就不說了。下載下傳就到apache官網下載下傳,位址:

http://activemq.apache.org/activemq-5112-release.html

找到如下圖位置,點選下載下傳:

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

下載下傳好後上傳,解壓,

完了後我們到active目錄下看到如下内容:

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

我們可以看到有一個名為activemq-all-5.12.0.jar的jar包,這個jar包,如果不與spring結合,隻是簡單用來當做activemq用戶端的話,可以使用。如果要将activemq與spring整合的話,不要使用這個jar包,因為這個jar包當中包含了spring的包結構,而且裡面的類與spring裡面的類名稱是一樣的,但是方法不全,當我們将spring和activemq結合的時候,如果系統使用的是activemq的jar包當中的spring的類的話就會報錯,啟動都啟動不了,而且錯誤還隐藏的特别深,難以捉摸其原因。是以整合的話,不要用這個jar包!!!activemq有一個版本5.11.2,裡面沒有spring的包結構,我們可以使用。

我們看下bin目錄下的檔案清單,如下圖所示,其中activemq檔案是用來啟動activemq的。

conf目錄存放的是一些配置檔案,我們不用動,data目錄存放的是服務端的緩存資料

webapps提供了管理的背景,如下所示。

3,不用做改動,直接啟動mq

xiaoye@ubuntu3:~$ ./activemq/bin/activemq start

INFO: Loading ‘/home/xiaoye/activemq/bin/env’

INFO: Using java ‘/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java’

INFO: Starting – inspect logfiles specified in logging.properties and log4j.properties to get details

INFO: pidfile created : ‘/home/xiaoye/activemq/data/activemq.pid’ (pid ‘1699’)

這樣就啟動成功了;

web通路一下,預設端口是8161

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

上面這個界面是沒有使用者登入的界面。下面我們用admin登入,預設的賬号和密碼都是admin

http://192.168.72.133:8161/admin/

打開這個連結會彈出輸入賬号密碼的框,填進去就行了。

點選Quenes如下,這個是點對點消息發送界面

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

再點選topic是釋出/訂閱模式界面

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

在Send中可以測試發送點對點或釋出/訂閱兩種消息,如下圖所示。

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

三,代碼測試ActiveMQ

下面我們要寫java代碼測試了。

建立一個maven工程。打開eclipse ,右鍵建立maven project –》

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

finish

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

修改pom.xml添加maven依賴,依賴我們到apeche,maven官網去找。可直接百度關鍵詞,active maven,圖下:

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

點開找到,我們下載下傳active版本

把這個maven依賴拷過來:

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.11.2</version>
</dependency>           

複制

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

這樣maven就能夠自動幫我們下載下傳active的jar包了。

下面建立一個junit測試類:

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。
第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。
第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

finish即可

在類中加入一下内容做簡單測試,報錯,我們

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

如下,滑鼠防止@Test上,給提示導入Junit包,導入後,就沒問題了。右鍵運作也是OK的。

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

下面我們就來寫測試類,先來測試queue點對點的消息發送方式:

package com.xiaoyexinxin.activeMQTest;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import junit.framework.TestCase;

/**
 * 
 * @author liuxin
 * @date   2018年4月10日
 */
public class TestActiveMq extends TestCase {
	
	@Test
	public void testQueueProducer() throws JMSException{
		//建立一個連結工廠connectionFactory對象,需要指定mq服務ip和端口,注意brokerURL的開頭是  
        //tcp://而不是我們通常的http://,端口是61616而不是我們通路activemq背景管理頁面所使用的8161  
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
		//使用connectionFactory 連結一個connection對象
		Connection connection=connectionFactory.createConnection();
		
		//開啟連結,調用connection 對象的start方法
		connection.start();
		//使用connection 建立一個session對象
		//第一個參數是是否開啟事務,一般不使用分布式事務,因為它特别消耗性能,而且顧客體驗特别差,現在網際網路的  
        //做法是保證資料的最終一緻(也就是允許暫時資料不一緻),比如顧客下單購買東西,一旦訂單生成完就立刻響應給使用者  
        //下單成功。至于下單後一系列的操作,比如通知會計記賬、通知物流發貨、商品數量同步等等都先不用管,隻需要  
        //發送一條消息到消息隊列,消息隊列來告知各子產品進行相應的操作,一次告知不行就兩次,直到完成所有相關操作為止,這  
        //也就做到了資料的最終一緻性。如果第一個參數為true,那麼第二個參數将會被忽略掉。如果第一個參數為false,那麼  
        //第二個參數為消息的應答模式,常見的有手動和自動兩種模式,我們一般使用自動模式。  
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//使用session對象建立一個Destination(目标)對象,兩站形式,queue,topic兩種,這裡我們使用queue
		//參數就是消息隊列的名稱
		Queue queue=session.createQueue("test-queue");
		//建立生成者,
		MessageProducer producer=session.createProducer(queue);
		//建立消息内容
		//有兩種方式,第一種方式:  
//      TextMessage textMessage = new ActiveMQTextMessage();  
//      textMessage.setText("hello,activemq!!!");  
        //第二種方式:  
        TextMessage textMessage = session.createTextMessage("hello,activemq!!");  
         //發送消息
        producer.send(textMessage);
        //關閉資源,由内而外的關閉
        producer.close();
        session.close();
        connection.close();
	}

}           

複制

右鍵運作程式,成功後,在ActiveMQ的背景管理系統,點選”Queues”,可以看到我們剛才發送的那條消息”test-queue”。我們點選”test-queue”

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

點選test-queue,如下:Persistence為永久儲存,priority優先級是4 ,Redelivered是否重複投遞消息,這裡是否,

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

接着點選,長串的id

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

儲存了,打開日志看看是啥錯;

xiaoye@ubuntu3:~/activemq cd data xiaoye@ubuntu3:~/activemq/data ls activemq.log activemq.pid audit.log kahadb

xiaoye@ubuntu3:~/activemq/data$ tail -200 activemq.log

看到日志有下圖的錯:

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

這個是activemq不支援jdk1.8造成的,這裡我把虛拟機的jdk換成1.7的試試

官網下載下傳後上傳,我這裡用的是谷歌浏覽器下載下傳,谷歌浏覽器下載下傳的jdk不知道為何,應該是tar.gz結尾的jdk,卻隻有gz.于是百度一圈,直接修改壓縮包字尾,改為.tar結尾解壓

xiaoye@ubuntu3:~/activemq/data cd ~ xiaoye@ubuntu3:~ cd Downloads/ xiaoye@ubuntu3:~/Downloads

xiaoye@ubuntu3:~/Downloads$ ls

apache-activemq-5.11.2-bin.tar.gz apache-activemq-5.15.3-bin.tar.gz hbase-1.0.0-cdh5.5.1.tar.gz jdk-7u80-linux-x64.tar.gz

apache-activemq-5.12.0-bin.tar.gz hadoop-2.5.0-cdh5.2.0.tar.gz hive-0.13.1-cdh5.2.0.tar.gz sqoop-1.4.6-cdh5.5.4.tar.gz

解壓到目前目錄。

xiaoye@ubuntu3:~ cd Downloads/ xiaoye@ubuntu3:~/Downloads ls apache-activemq-5.11.2-bin.tar.gz hive-0.13.1-cdh5.2.0.tar.gz apache-activemq-5.12.0-bin.tar.gz jdk1.7.0_79 apache-activemq-5.15.3-bin.tar.gz jdk-7u79-linux-x64.tar.gz hadoop-2.5.0-cdh5.2.0.tar.gz sqoop-1.4.6-cdh5.5.4.tar.gz

hbase-1.0.0-cdh5.5.1.tar.gz

下面設定環境變量。

切換到root使用者。

root@ubuntu3:~# vim /etc/profile

export JAVA_HOME=/home/xiaoye/Downloads/jdk1.7.0_79

修改儲存後,soruce /etc/profile

這個改完後,在修改目前使用者下的環境變量。

xiaoye@ubuntu3:~$ vim .bashrc

export JAVA_HOME=/home/xiaoye/Downloads/jdk1.7.0_79

export CLASSPATH=${JAVA_HOME}/lib

export PATH={JAVA_HOME}/bin:PATH

修改儲存後,source .bashrc

啟動activemq

xiaoye@ubuntu3:~$ ./activemq/bin/activemq start

INFO: Loading ‘/home/xiaoye/activemq/bin/env’

INFO: Using java ‘/home/xiaoye/Downloads/jdk1.7.0_79/bin/java’

INFO: Process with pid ‘1454’ is already running

顯示還在運作,那就kill -9 1454

再次重新開機,就好了。

這樣在點選ID的時候就不會報錯了。

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

四,消費者

下面我們寫消費者方法,寫在同生産者一個類裡面;

内容如下:

/*
	 * 消費者
	 */
	@Test
	public void testQueueConsumer() throws Exception{
		//跟建立生産者一樣,先連接配接mq
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
		//連接配接一個connection 對象
		Connection connection=connectionFactory.createConnection();
		//開啟連結
		connection.start();
		//建立一個session會話對象
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//使用session對象建立一個Destination對象,兩種形式queue,topic ,這裡我們使用queue
		//參數是消息隊列的名稱
		Queue queue=session.createQueue("test-queue");
		//使用session建立一個consumer對象
		MessageConsumer consumer=session.createConsumer(queue);
		//向Consumer對象中設定一個Messagelistener對象,用來接受消息
		consumer.setMessageListener(new MessageListener() {
			public void onMessage(Message arg0) {
				// TODO Auto-generated method stub
				if(arg0 instanceof TextMessage){
					TextMessage textMessage=(TextMessage) arg0;
					String text;
					try {
						text = textMessage.getText();
						System.out.println(text);
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		});
		//程式等待接收使用者結束操作  
        //程式自己并不知道什麼時候有消息,也不知道什麼時候不再發送消息了,這就需要手動幹預,  
        //當我們想停止接收消息時,可以在控制台輸入任意鍵,然後回車即可結束接收操作(也可以直接按回車)。 
		System.in.read();
		//關閉資源
		consumer.close();
		session.close();
		connection.close();
	}           

複制

右鍵方法運作消費者方法,會在控制台看到生産者發送的hello world消息,如下:

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

這裡執行消費者方法後并沒有停止運作,還在等待新新的消息進來,那麼我們右鍵生産者方法再次運作會發現有兩個hello輸出。

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

我們修改一下生産者内容,再次運作。

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

發現有一個語句輸出,說明沒有問題。

我們到activeMQ背景管理頁面看看

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

說一下這幾個标簽的含義,number of pending messages 待發送消息數

Number Of Consumers 消費者消息數

Messages Enqueued 壓入隊列的消息數量

Messages Dequeued 出隊列的消息數量,也就是被消費的消息數

五,topic 釋出/訂閱模式

生産者代碼:

/*
	 * 訂閱模式的生産者
	 */
	@Test
	public void testTopicProducer() throws JMSException{
		//建立工程連接配接mq
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
		//使用工程建立一個連接配接對象
		Connection connection=connectionFactory.createConnection();
		//開啟連結
		connection.start();
		//使用連結建立一個會話
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//使用會話建立一個目标對象
		Topic topic=session.createTopic("test-topic");
		//建立一個生産者
		MessageProducer producer=session.createProducer(topic);
		 //7.建立一個TextMessage對象  ,并寫入要傳輸的消息内容
        //有兩種方式,第一種方式:  
//      TextMessage textMessage = new ActiveMQTextMessage();  
//      textMessage.setText("hello,activemq!!!");  
        //第二種方式:  
		TextMessage textMessage=session.createTextMessage("hello topic!");
		//發送消息
		producer.send(textMessage);
		//關閉資源
		producer.close();
		session.close();
		connection.close();
		
	}           

複制

運作上面的測試方法,運作成功後,我們通路activemq的管理背景頁面,點選”Topics”,可以看到有”test-topic”這一行,壓入消息隊列一條消息,但由于沒有消費者,是以沒有消費掉該消息。

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

點開test-topic發現:消息體裡并沒有我們發送的内容。

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

而queue就不同,queue有持久化一欄,發送的消息會被儲存下來。這樣的話,就會有個問題,那就是如果發送topic消息時沒有消費者,那麼這條消息便不存在了,不會再被消費了。是以我們要想消息不會被遺失掉,我們要先打開消費者,然後再發送topic消息。

我們來寫消費topic消息的方法,如下圖所示,該方法與我們上面學習的消費隊列消息的方法不同的是建立Destination的時候不一樣,同時為了模拟多個消費者,在該方法中添加一條輸出資訊,标明該方法是第幾個消費者。

消費者代碼:

/*
	 * 訂閱模式的消費者
	 */
	@Test
	public void testTopicConsumer() throws JMSException{
		//建立工廠,連結mq
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
		//使用工程建立一個連結
		Connection connection=connectionFactory.createConnection();
		//打開連結
		connection.start();
		//使用連結建立一個會話
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//建立消息隊列
		Topic topic=session.createTopic("test-topic");
		//消費者
		MessageConsumer consumer=session.createConsumer(topic);
		//消費者設定監聽器,監聽傳來的消息
		consumer.setMessageListener(new MessageListener() {
			
			public void onMessage(Message arg0) {
				// TODO Auto-generated method stub
				if(arg0 instanceof TextMessage){
					TextMessage textMessage=(TextMessage) arg0;
					try {
						String text=textMessage.getText();
						System.out.println(text);
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		});
		
		//程式等待接收使用者結束操作  
        //程式自己并不知道什麼時候有消息,也不知道什麼時候不再發送消息了,這就需要手動幹預,  
        //當我們想停止接收消息時,可以在控制台輸入任意鍵,然後回車即可結束接收操作(也可以直接按回車)。  
        System.out.println("topic消費者1111。。。。。");  
        try {
			System.in.read();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}  
        //9.關閉資源  
        consumer.close();  
        session.close();  
        connection.close(); 
		
	}           

複制

右鍵運作消費者方法:

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

修改為2222.。。。。。

再次運作消費者方法。

修改為33333.。。。。

再次運作消費者方法

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

發現這裡消費者的數量是3 了

啟動了三個消費者後,我們再發送一次topic消息,發完之後,我們看各個控制台的資訊。如下圖所示。可以看到都列印出了我們發送的topic資訊。

三個程序控制台都有列印生産者消息。

第一章:activeMQ原理,安裝,queue,topic以及topic持久化方式介紹,包括修改ubuntu的jdk環境變量。

六,topic消息持久化

topic消息沒有持久化,也就意味着,如果消息發送者發送消息的時候,如果消費者沒有運作的話,它将無法消費這個消息了(即使它啟動也無法再接收到那條topic消息了),這樣問題就來了,如果那條消息非常重要呢?我們不能容忍接收不到消息的情況。

生産者代碼:

/*
	 * 訂閱釋出式 可持久化生産者
	 */
	@Test
	public void TestTopicPersistenceProducer() throws JMSException{
		//建立工程連接配接mq
		ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
		//設定異步發送消息可顯著提高發送性能
		connectionFactory.setUseAsyncSend(true);
		//使用工程建立一個連接配接對象
		Connection connection=connectionFactory.createConnection();
		//對每個生産者來說其clientID值必須唯一
		connection.setClientID("producerTopic2");
		//開啟連結
		connection.start();
		//使用連結建立一個會話
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//使用會話建立一個目标對象
		Topic topic=session.createTopic("test-topic");
		//建立一個生産者
		MessageProducer producer=session.createProducer(topic);
		//DelieveryMode設定為PERSISTENCE(持久化)
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		 //建立一個TextMessage對象  ,并寫入要傳輸的消息内容
        //有兩種方式,第一種方式:  
//		      TextMessage textMessage = new ActiveMQTextMessage();  
//		      textMessage.setText("hello,activemq!!!");  
        //第二種方式:  
		TextMessage textMessage=session.createTextMessage("hello topic!persistence2");
		//發送消息
		producer.send(textMessage);
		//關閉資源
		producer.close();
		session.close();
		connection.close();
	}           

複制

消費者代碼:

/*
	 * 訂閱釋出式 可持久化消費者
	 */

	@Test
	public void TestTopicPersistenceConsumer() throws JMSException{
		//建立工廠,連結mq
		ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
		//設定異步接受消息,可提高接受性能
		connectionFactory.setUseAsyncSend(true);
		//使用工程建立一個連結
		Connection connection=connectionFactory.createConnection();
		//設定每個消費者id,每個都要不同
		connection.setClientID("consumer1");
		//打開連結
		connection.start();
		//使用連結建立一個會話
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//建立消息隊列
		Topic topic=session.createTopic("test-topic");
		//消費者
		MessageConsumer consumer=session.createDurableSubscriber(topic, "consumer1");
		//消費者設定監聽器,監聽傳來的消息
		consumer.setMessageListener(new MessageListener() {
			
			public void onMessage(Message arg0) {
				// TODO Auto-generated method stub
				if(arg0 instanceof TextMessage){
					TextMessage textMessage=(TextMessage) arg0;
					try {
						String text=textMessage.getText();
						System.out.println(text);
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		});
		
		//程式等待接收使用者結束操作  
        //程式自己并不知道什麼時候有消息,也不知道什麼時候不再發送消息了,這就需要手動幹預,  
        //當我們想停止接收消息時,可以在控制台輸入任意鍵,然後回車即可結束接收操作(也可以直接按回車)。  
        System.out.println("topic消費者3333。。。。。");  
        try {
			System.in.read();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}  
        //9.關閉資源  
        consumer.close();  
        session.close();  
        connection.close(); 
	}           

複制

我們還需要配置下activemq的activemq.xml檔案,隻需要添加一句配置,就是在<broker的末尾添加一句關于持久化的配置persistent=”true”即可。如下:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="true">             

複制

然後重新啟動mq;

這樣設定持久化了就無所謂哪個先啟動了。

釋出者:全棧程式員棧長,轉載請注明出處:https://javaforall.cn/106159.html原文連結:https://javaforall.cn