前言:這是中間件一個系列的文章之一,有需要的朋友可以看看這個系列的其他文章: 消息中間件系列一、消息中間件的基本了解 消息中間件系列二、Windows下的activeMQ和rabbitMQ的安裝 消息中間件系列三、JMS和activeMQ的簡單使用 消息中間件系列四、認識AMQP和RabbiyMq的簡單使用 消息中間件系列五、rabbit消息的确認機制 消息中間件系列六,rabbit與spring內建實戰
一、JMS
1、什麼是JMS
JMS(JAVA Message Service,java消息服務)本質是API,Java平台消息中間件的規範,java應用程式之間進行消息交換。并且通過提供标準的産生、發送、接收消息的接口簡化企業應用的開發。它使分布式通信耦合度更低,消息服務更加可靠以及異步性。
2、JMS規範中的點對點 (P2P) 模式:
P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每個消息都被發送到一個特定的隊列,接收者從隊列中擷取消息。隊列保留着消息,直到他們被消費或逾時。
P2P的特點:
- 每個消息隻有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
- 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之後,不管接收者有沒有正在運作,它不會影響到消息被發送到隊列
- 接收者在成功接收消息之後需向隊列應答成功
如果希望發送的每個消息都會被成功處理的話,那麼需要P2P模式。
3、JMS規範中的主題模式(Pub/sub釋出訂閱):
包含三個角色主題(Topic),釋出者(Publisher),訂閱者(Subscriber) 多個釋出者将消息發送到Topic,系統将這些消息傳遞給多個訂閱者。
Pub/Sub的特點
- 每個消息可以有多個消費者
- 釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的消息
- 為了消費消息,訂閱者必須保持運作的狀态,如果消息生産者釋出了消息之後,訂閱者沒有運作,則會錯過消息,當訂閱者再次運作也接受不到消息了。
為了緩和這樣嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運作),它也能接收到釋出者的消息。
如果希望發送的消息可以不被做任何處理、或者隻被一個消息者處理、或者可以被多個消費者處理的話,那麼可以采用Pub/Sub模型。
4、消息消費
在JMS中,消息的産生和消費都是異步的。對于消費來說,JMS的消息者可以通過兩種方式來消費消息。
(1)同步
訂閱者或接收者通過receive方法來接收消息,receive方法在接收到消息之前(或逾時之前)将一直阻塞;
(2)異步
訂閱者或接收者可以注冊為一個消息監聽器。當消息到達之後,系統自動調用監聽器的onMessage方法。
5、JMS對象模型包含如下幾個要素:
1)連接配接工廠:建立一個JMs連接配接
建立Connection對象的工廠,針對兩種不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory兩種。
2)JMS連接配接:用戶端和伺服器之間的一個連接配接。
Connection表示在用戶端和JMS系統之間建立的連結(對TCP/IP socket的包裝)。Connection可以産生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
3)JMS會話:客戶和伺服器會話的狀态,建立在連接配接之上的
Session是操作消息的接口。可以通過session建立生産者、消費者、消息等。Session提供了事務的功能。當需要使用session發送/接收多個消息時,可以将這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
4)JMS目的Destination:消息隊列
Destination的意思是消息生産者的消息發送目标或者說消息消費者的消息來源。對于消息生産者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。
5)JMS生産者:消息的生成
消息生産者由Session建立,并用于将消息發送到Destination。同樣,消息生産者分兩種類型:QueueSender和TopicPublisher。可以調用消息生産者的方法(send或publish方法)發送消息。
6)JMS消費者:接收消息
消息消費者由Session建立,用于接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分别通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。
7)Broker
簡單來說就是消息隊列伺服器實體。
6、JMS規範中的消息類型
- TextMessage
- MapMessage
- ObjectMessage
- BytesMessage
- StreamMessage
二、activeMQ的簡單使用
activeMQ是JMS規範中的一種消息中間件。
下面先實作一個最簡單的消息提供者和消息消費者,熟悉一下JMS規範。
說明一下:原生點對點和釋出訂閱模式,隻要在session建立隊列的時候改一下即可,其他都可以不變,下面代碼會有注釋。
首先通過maven引入activemq包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.8.0</version>
</dependency>
消息生成者代碼
package com.dongnaoedu;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProducer {
//預設連接配接使用者名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//預設連接配接密碼
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//預設連接配接位址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//發送的消息數量
private static final int SENDNUM = 10;
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer messageProducer;
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);//連接配接工程
try {
connection = connectionFactory.createConnection();//連接配接
connection.start();
/*
createSession參數取值
* 1、為true表示啟用事務
* 2、消息的确認模式
* AUTO_ACKNOWLEDGE 自動簽收
* CLIENT_ACKNOWLEDGE 用戶端自行調用acknowledge方法簽收
* DUPS_OK_ACKNOWLEDGE 不是必須簽收,消費可能會重複發送
* 在第二次重新傳送消息的時候,消息
頭的JmsDelivered會被置為true标示目前消息已經傳送過一次,
用戶端需要進行消息的重複處理控制。
* */
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//會話
destination = session.createQueue("HelloWAM");//點對點消息隊列,如果要建立釋出訂閱模式,是要改成session.createTopic("HelloWAM");,整個類其他地方都不用變
messageProducer = session.createProducer(destination);//消息生産者
for(int i=0;i<SENDNUM;i++){
String msg = "發送消息"+i+" "+System.currentTimeMillis();
TextMessage message = session.createTextMessage(msg);
System.out.println("發送消息:"+msg);
messageProducer.send(message);
}
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
将發送的消息列印出來:
在可視化界面可以看到已經生成了隊列,并且已經有了消息。
消息消費者代碼:
package com.dongnaoedu;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//預設連接配接使用者名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設連接配接密碼
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//預設連接配接位址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//連接配接工廠
Connection connection = null;//連接配接
Session session;//會話 接受或者發送消息的線程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息的消費者
//執行個體化連接配接工廠
connectionFactory = new ActiveMQConnectionFactory(JmsConsumer.USERNAME,
JmsConsumer.PASSWORD, JmsConsumer.BROKEURL);
try {
//通過連接配接工廠擷取連接配接
connection = connectionFactory.createConnection();
//啟動連接配接
connection.start();
//建立session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個連接配接HelloWorld的消息隊列
destination = session.createQueue("HelloWAM");//點對點消息隊列,如果要建立釋出訂閱模式,是要改成session.createTopic("HelloWAM");,整個類其他地方都不用變
//建立消息消費者
messageConsumer = session.createConsumer(destination);
//讀取消息
while(true){
TextMessage textMessage = (TextMessage)messageConsumer.receive(10000);
if(textMessage != null){
System.out.println("Accept msg : "+textMessage.getText());
}else{
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收到的消息:
可以看到消息已經被處理了
三、activeMQ與spring整合實戰
消息生産者項目
1、用maven引入相關的包,pom檔案:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dongnaoedu</groupId>
<artifactId>am_spring_producer</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>am_spring_producer Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-web-api</artifactId>
<version>7.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.3.11.RELEASE</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
<!--日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>1.0.13</version>
</dependency>
<!--JSON-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.7.4</version>
</dependency>
<!-- xbean -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.3.1</version>
</dependency>
<!--ActiveMq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.11.RELEASE</version>
</dependency>
</dependencies>
<build>
<finalName>am_spring_producer</finalName>
<resources>
<resource>
<directory>${basedir}/src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
</build>
</project>
2、web.xml配置
<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<display-name>ActiveMQSpringProducer</display-name>
<servlet-mapping>
<servlet-name>default</servlet-name>
<url-pattern>*.js</url-pattern>
</servlet-mapping>
<!-- Spring 編碼過濾器 start -->
<filter>
<filter-name>characterEncoding</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>characterEncoding</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<!-- Spring 編碼過濾器 End -->
<!-- Spring Application Context Listener Start -->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<!-- Spring Application Context Listener End -->
<!-- Spring MVC Config Start -->
<servlet>
<servlet-name>SpringMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-mvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>SpringMVC</servlet-name>
<!-- Filter all resources -->
<url-pattern>/</url-pattern>
</servlet-mapping>
<!-- Spring MVC Config End -->
</web-app>
3、spring配置檔案:applicationContext.xml:
注意:與spring整合的時候要加上命名空間:
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms=
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"
JMS對象模型的連接配接工廠、連接配接、消費者、生産者等可以通過配置檔案注入spring容器。
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 通路 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- 配置掃描路徑 -->
<context:component-scan base-package="com.dongnaoedu">
<context:exclude-filter type="annotation"
expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
<!-- ActiveMQ 連接配接工廠 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
<!-- Spring Caching連接配接工廠 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connection"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100"></property>
</bean>
<!-- Spring JmsTemplate 的消息生産者 start-->
<!-- 定義JmsTemplate的Queue類型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connection"></constructor-arg>
<!-- 隊列模式-->
<property name="pubSubDomain" value="false"></property>
</bean>
<!-- 定義JmsTemplate的Topic類型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connection"></constructor-arg>
<!-- true:釋出訂閱模式; false:隊列模式-->
<property name="pubSubDomain" value="true"></property>
</bean>
<!--Spring JmsTemplate 的消息生産者 end-->
<!--接收消費者應答的監聽器-->
<!--
消息監聽器。如果注冊了消息監聽器,一旦消息到達,将自動調用監聽器的onMessage方法。
EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
-->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connection" acknowledge="auto">
<jms:listener destination="tempqueue" ref="getResponse"></jms:listener>
</jms:listener-container>
</beans>
4、做成網頁實戰,需要配置springMVC;spring-MVC.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 通路 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">
<!-- <mvc:default-servlet-handler />-->
<mvc:resources mapping="/js/**" location="/js/"/>
<mvc:annotation-driven
content-negotiation-manager="contentNegotiationManager" />
<context:component-scan base-package="com.dongnaoedu">
<context:include-filter type="annotation"
expression="org.springframework.stereotype.Controller" />
</context:component-scan>
<bean id="stringHttpMessageConverter"
class="org.springframework.http.converter.StringHttpMessageConverter">
<property name="supportedMediaTypes">
<list>
<bean class="org.springframework.http.MediaType">
<constructor-arg index="0" value="text" />
<constructor-arg index="1" value="plain" />
<constructor-arg index="2" value="UTF-8" />
</bean>
</list>
</property>
</bean>
<bean id="mappingJackson2HttpMessageConverter"
class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />
<bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
<property name="messageConverters">
<list>
<ref bean="stringHttpMessageConverter" />
<ref bean="mappingJackson2HttpMessageConverter" />
</list>
</property>
</bean>
<bean id="contentNegotiationManager"
class="org.springframework.web.accept.ContentNegotiationManagerFactoryBean">
<property name="mediaTypes">
<map>
<entry key="html" value="text/html" />
<entry key="pdf" value="application/pdf" />
<entry key="xsl" value="application/vnd.ms-excel" />
<entry key="xml" value="application/xml" />
<entry key="json" value="application/json" />
</map>
</property>
<property name="defaultContentType" value="text/html" />
</bean>
<bean id="viewResolver"
class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver">
<property name="order" value="0" />
<property name="contentNegotiationManager" ref="contentNegotiationManager" />
<property name="viewResolvers">
<list>
<bean class="org.springframework.web.servlet.view.BeanNameViewResolver" />
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="viewClass"
value="org.springframework.web.servlet.view.JstlView" />
<property name="prefix" value="/WEB-INF/pages/" />
<property name="suffix" value=".jsp"></property>
</bean>
</list>
</property>
<property name="defaultViews">
<list>
<bean class="org.springframework.web.servlet.view.json.MappingJackson2JsonView">
<property name="extractValueFromSingleKeyModel" value="true" />
</bean>
</list>
</property>
</bean>
</beans>
5、監聽接口:
package com.dongnaoedu.mq.producer.queue;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 動腦學院-Mark老師
* 建立日期:2017/11/02
* 建立時間: 22:19
* 接收消費者應答的類
*/
@Component
public class GetResponse implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("GetResponse accept response : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
6、釋出模式的簡單消息生産者:
package com.dongnaoedu.mq.producer.topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
/**
*
* @author lgy
* @description Topic生産者發送消息到Topic
*
*/
@Component("topicSender")
public class TopicSender {
@Autowired
@Qualifier("jmsTopicTemplate")
private JmsTemplate jmsTemplate;
public void send(String queueName,final String message){
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage(message);
return msg;
}
});
}
}
7、點對點模式消息生産者:
package com.dongnaoedu.mq.producer.queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import javax.jms.*;
import java.awt.peer.SystemTrayPeer;
/**
*
* @author lgy
* @description 隊列消息生産者,發送消息到隊列
*
*/
@Component("queueSender")
public class QueueSender {
@Autowired
@Qualifier("jmsQueueTemplate")//在xml配置好的隊列模式
private JmsTemplate jmsTemplate;
@Autowired
private GetResponse getResponse;
public void send(String queueName,final String message){
//jmsTemplate的send方法傳遞兩個參數,第一個傳隊列名,第二個傳消息建立接口的實作對象
jmsTemplate.send(queueName, new MessageCreator() {
//消息建立接口隻有這麼一個方法,架構會把session會話對象傳給方法,此方法需要傳回一個Message消息對象,架構會發送消息到隊列
public Message createMessage(Session session) throws JMSException {
//建立會話消息
Message msg = session.createTextMessage(message);
//配置消費者應答相關内容
//擷取臨時的消息隊列
Destination tempDest = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDest);//擷取應答的消息消費者
responseConsumer.setMessageListener(getResponse);//設定消費者傳回消息時的監聽接口
msg.setJMSReplyTo(tempDest);//告訴消費者應答消息發送到臨時隊列
//消費者應答的id,發送出的消息和應答消息進行比對
String uid = System.currentTimeMillis()+"";
msg.setJMSCorrelationID(uid);
return msg;
}
});
}
}
8、controller:
package com.dongnaoedu.controller;
import com.dongnaoedu.mq.producer.queue.QueueSender;
import com.dongnaoedu.mq.producer.topic.TopicSender;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
/**
*
* @author lgy
* @description controller測試
*/
@Controller
@RequestMapping("/activemq")
public class ActivemqController {
@Resource
QueueSender queueSender;
@Resource
TopicSender topicSender;
/**
* 發送消息到隊列
* Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中
* @param message
* @return String
*/
@ResponseBody
@RequestMapping("queueSender")
public String queueSender(@RequestParam("message")String message){
String opt="";
try {
queueSender.send("test.queue",message);
opt = "suc";
} catch (Exception e) {
opt = e.getCause().toString();
}
return opt;
}
/**
* 發送消息到主題
* Topic主題 :放入一個消息,所有訂閱者都會收到
* 這個是主題目的地是一對多的
* @param message
* @return String
*/
@ResponseBody
@RequestMapping("topicSender")
public String topicSender(@RequestParam("message")String message){
String opt = "";
try {
topicSender.send("test.topic",message);
opt = "suc";
} catch (Exception e) {
opt = e.getCause().toString();
}
return opt;
}
}
調用不同的接口即可釋出消息到對應的隊列中。
消息消費者項目
項目的pom檔案和web.xml以及spring-mvc.xml基本和生産者一樣,且不是重點,這裡不再浪費篇幅,就不貼出來了。
1、applicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 通路 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- 配置掃描路徑 -->
<context:component-scan base-package="com.dongnaoedu">
<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
<!-- ActiveMQ 連接配接工廠 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
<!-- Spring Caching連接配接工廠 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 消息消費者 start-->
<!-- 定義Queue監聽器 -->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
<jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
</jms:listener-container>
<!-- 定義Topic監聽器 -->
<jms:listener-container destination-type="topic" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
<jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
</jms:listener-container>
<!-- 消息消費者 end -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
<!-- 隊列模式-->
<property name="pubSubDomain" value="false"></property>
</bean>
</beans>
2、釋出訂閱模式的消息監聽:
package com.dongnaoedu.mq.consumer.topic;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
*
* @author lgy
* @description Topic消息監聽器
*
*/
@Component
public class TopicReceiver2 implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("TopicReceiver2 accept msg : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
可以同時設定多個監聽,有消息釋出時,多個監聽器都能監聽到消息
package com.dongnaoedu.mq.consumer.topic;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
*
* @author lgy
* @description Topic消息監聽器
*
*/
@Component
public class TopicReceiver1 implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("TopicReceiver1 accept msg : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
在消息生産者項目的前端界面輸入消息内容:“123456”,在點選發送Topic消息,以釋出訂閱模式釋出消息;消費項目這邊的兩個釋出訂閱模式監聽器都能收到消息并列印出來:
3、點對點模式的消息監聽
package com.dongnaoedu.mq.consumer.queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
*
* @author Mark
* @description 隊列消息監聽器
*
*/
@Component
public class QueueReceiver1 implements MessageListener {
@Autowired
private ReplyTo replyTo;
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("QueueReceiver1 accept msg : "+textMsg);
//do my 業務工作
replyTo.send(textMsg,message);//執行回複
} catch (JMSException e) {
e.printStackTrace();
}
}
}
可以給消息生産者回複消息:
package com.dongnaoedu.mq.consumer.queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
/**
* 負責向消息提供者發送應答資訊
*/
@Component
public class ReplyTo {
@Autowired
private JmsTemplate jmsTemplate;
public void send(final String consumerMsg, Message produerMessage) throws JMSException {
//消息回複,getJMSReplyTo回去回複的臨時隊列
jmsTemplate.send(produerMessage.getJMSReplyTo(), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage("QueueReceiver1 accept msg"
+consumerMsg);
return msg;
}
});
}
}
可以有多個點對點隊列監聽器,當同一隊列有多個監聽器時,會輪詢給監聽器發消息
package com.dongnaoedu.mq.consumer.queue;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
*
* @author Mark
* @description 隊列消息監聽器
*
*/
@Component
public class QueueReceiver2 implements MessageListener {
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage)message).getText();
System.out.println("QueueReceiver2 accept msg : "+textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
在生産項目連續釋出6條點對點消息,消費項目這邊接收到的消息列印結果為:
上圖是兩個監聽器輪詢接收到消息并列印出來的結果。