天天看點

消息中間件系列三、JMS和activeMQ的簡單使用二、activeMQ的簡單使用三、activeMQ與spring整合實戰

前言:這是中間件一個系列的文章之一,有需要的朋友可以看看這個系列的其他文章: 消息中間件系列一、消息中間件的基本了解 消息中間件系列二、Windows下的activeMQ和rabbitMQ的安裝 消息中間件系列三、JMS和activeMQ的簡單使用 消息中間件系列四、認識AMQP和RabbiyMq的簡單使用 消息中間件系列五、rabbit消息的确認機制 消息中間件系列六,rabbit與spring內建實戰

一、JMS

1、什麼是JMS

  JMS(JAVA Message Service,java消息服務)本質是API,Java平台消息中間件的規範,java應用程式之間進行消息交換。并且通過提供标準的産生、發送、接收消息的接口簡化企業應用的開發。它使分布式通信耦合度更低,消息服務更加可靠以及異步性。

2、JMS規範中的點對點 (P2P) 模式:

P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每個消息都被發送到一個特定的隊列,接收者從隊列中擷取消息。隊列保留着消息,直到他們被消費或逾時。

消息中間件系列三、JMS和activeMQ的簡單使用二、activeMQ的簡單使用三、activeMQ與spring整合實戰

P2P的特點:

  • 每個消息隻有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
  • 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之後,不管接收者有沒有正在運作,它不會影響到消息被發送到隊列
  • 接收者在成功接收消息之後需向隊列應答成功

如果希望發送的每個消息都會被成功處理的話,那麼需要P2P模式。

3、JMS規範中的主題模式(Pub/sub釋出訂閱):

包含三個角色主題(Topic),釋出者(Publisher),訂閱者(Subscriber) 多個釋出者将消息發送到Topic,系統将這些消息傳遞給多個訂閱者。

消息中間件系列三、JMS和activeMQ的簡單使用二、activeMQ的簡單使用三、activeMQ與spring整合實戰

Pub/Sub的特點

  • 每個消息可以有多個消費者
  • 釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的消息
  • 為了消費消息,訂閱者必須保持運作的狀态,如果消息生産者釋出了消息之後,訂閱者沒有運作,則會錯過消息,當訂閱者再次運作也接受不到消息了。

為了緩和這樣嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運作),它也能接收到釋出者的消息。

如果希望發送的消息可以不被做任何處理、或者隻被一個消息者處理、或者可以被多個消費者處理的話,那麼可以采用Pub/Sub模型。

4、消息消費

在JMS中,消息的産生和消費都是異步的。對于消費來說,JMS的消息者可以通過兩種方式來消費消息。

(1)同步

訂閱者或接收者通過receive方法來接收消息,receive方法在接收到消息之前(或逾時之前)将一直阻塞;

(2)異步

訂閱者或接收者可以注冊為一個消息監聽器。當消息到達之後,系統自動調用監聽器的onMessage方法。

5、JMS對象模型包含如下幾個要素:

1)連接配接工廠:建立一個JMs連接配接

  建立Connection對象的工廠,針對兩種不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory兩種。

2)JMS連接配接:用戶端和伺服器之間的一個連接配接。

  Connection表示在用戶端和JMS系統之間建立的連結(對TCP/IP socket的包裝)。Connection可以産生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。

3)JMS會話:客戶和伺服器會話的狀态,建立在連接配接之上的

  Session是操作消息的接口。可以通過session建立生産者、消費者、消息等。Session提供了事務的功能。當需要使用session發送/接收多個消息時,可以将這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。

4)JMS目的Destination:消息隊列

  Destination的意思是消息生産者的消息發送目标或者說消息消費者的消息來源。對于消息生産者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。

5)JMS生産者:消息的生成

  消息生産者由Session建立,并用于将消息發送到Destination。同樣,消息生産者分兩種類型:QueueSender和TopicPublisher。可以調用消息生産者的方法(send或publish方法)發送消息。

6)JMS消費者:接收消息

  消息消費者由Session建立,用于接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分别通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。

7)Broker

  簡單來說就是消息隊列伺服器實體。

6、JMS規範中的消息類型

  1. TextMessage
  2. MapMessage
  3. ObjectMessage
  4. BytesMessage
  5. StreamMessage

二、activeMQ的簡單使用

activeMQ是JMS規範中的一種消息中間件。

下面先實作一個最簡單的消息提供者和消息消費者,熟悉一下JMS規範。

說明一下:原生點對點和釋出訂閱模式,隻要在session建立隊列的時候改一下即可,其他都可以不變,下面代碼會有注釋。

首先通過maven引入activemq包

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.8.0</version>
</dependency>           

消息生成者代碼

package com.dongnaoedu;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProducer {

    //預設連接配接使用者名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //預設連接配接密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //預設連接配接位址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //發送的消息數量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session;
        Destination destination;
        MessageProducer messageProducer;

        connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);//連接配接工程

        try {
            connection = connectionFactory.createConnection();//連接配接
            connection.start();
            /*
            createSession參數取值
            * 1、為true表示啟用事務
            * 2、消息的确認模式
            * AUTO_ACKNOWLEDGE  自動簽收
            * CLIENT_ACKNOWLEDGE 用戶端自行調用acknowledge方法簽收
            * DUPS_OK_ACKNOWLEDGE 不是必須簽收,消費可能會重複發送
            * 在第二次重新傳送消息的時候,消息
               頭的JmsDelivered會被置為true标示目前消息已經傳送過一次,
               用戶端需要進行消息的重複處理控制。
            * */
            session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//會話
            destination = session.createQueue("HelloWAM");//點對點消息隊列,如果要建立釋出訂閱模式,是要改成session.createTopic("HelloWAM");,整個類其他地方都不用變
            messageProducer = session.createProducer(destination);//消息生産者
            for(int i=0;i<SENDNUM;i++){
                String msg = "發送消息"+i+" "+System.currentTimeMillis();
                TextMessage message = session.createTextMessage(msg);
                System.out.println("發送消息:"+msg);
                messageProducer.send(message);
            }
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           

将發送的消息列印出來:

消息中間件系列三、JMS和activeMQ的簡單使用二、activeMQ的簡單使用三、activeMQ與spring整合實戰

在可視化界面可以看到已經生成了隊列,并且已經有了消息。

消息中間件系列三、JMS和activeMQ的簡單使用二、activeMQ的簡單使用三、activeMQ與spring整合實戰

消息消費者代碼:

package com.dongnaoedu;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//預設連接配接使用者名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設連接配接密碼
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//預設連接配接位址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//連接配接工廠
        Connection connection = null;//連接配接

        Session session;//會話 接受或者發送消息的線程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消費者

        //執行個體化連接配接工廠
        connectionFactory = new ActiveMQConnectionFactory(JmsConsumer.USERNAME,
                JmsConsumer.PASSWORD, JmsConsumer.BROKEURL);

        try {
            //通過連接配接工廠擷取連接配接
            connection = connectionFactory.createConnection();
            //啟動連接配接
            connection.start();
            //建立session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立一個連接配接HelloWorld的消息隊列
            destination = session.createQueue("HelloWAM");//點對點消息隊列,如果要建立釋出訂閱模式,是要改成session.createTopic("HelloWAM");,整個類其他地方都不用變

            //建立消息消費者
            messageConsumer = session.createConsumer(destination);

            //讀取消息
            while(true){
                TextMessage textMessage = (TextMessage)messageConsumer.receive(10000);
                if(textMessage != null){
                    System.out.println("Accept msg : "+textMessage.getText());
                }else{
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}           

接收到的消息:

消息中間件系列三、JMS和activeMQ的簡單使用二、activeMQ的簡單使用三、activeMQ與spring整合實戰

可以看到消息已經被處理了

消息中間件系列三、JMS和activeMQ的簡單使用二、activeMQ的簡單使用三、activeMQ與spring整合實戰

三、activeMQ與spring整合實戰

消息生産者項目

1、用maven引入相關的包,pom檔案:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.dongnaoedu</groupId>
  <artifactId>am_spring_producer</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>am_spring_producer Maven Webapp</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>javax</groupId>
      <artifactId>javaee-web-api</artifactId>
      <version>7.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>4.3.11.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>jstl</artifactId>
      <version>1.2</version>
    </dependency>

    <!--日志-->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.5</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.16</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>jcl-over-slf4j</artifactId>
      <version>1.7.5</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.0.13</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-core</artifactId>
      <version>1.0.13</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-access</artifactId>
      <version>1.0.13</version>
    </dependency>

    <!--JSON-->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.7.4</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.7.4</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.7.4</version>
    </dependency>

    <!-- xbean -->
    <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>3.16</version>
    </dependency>
    <dependency>
      <groupId>com.thoughtworks.xstream</groupId>
      <artifactId>xstream</artifactId>
      <version>1.3.1</version>
    </dependency>

    <!--ActiveMq-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>4.3.11.RELEASE</version>
    </dependency>

  </dependencies>
  <build>
    <finalName>am_spring_producer</finalName>
    <resources>
      <resource>
        <directory>${basedir}/src/main/java</directory>
        <includes>
          <include>**/*.xml</include>
        </includes>
      </resource>
    </resources>
  </build>
</project>           

2、web.xml配置

<!DOCTYPE web-app PUBLIC
 "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
 "http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         version="3.0">
  <display-name>ActiveMQSpringProducer</display-name>

  <servlet-mapping>
    <servlet-name>default</servlet-name>
    <url-pattern>*.js</url-pattern>
  </servlet-mapping>

  <!-- Spring 編碼過濾器 start -->
  <filter>
    <filter-name>characterEncoding</filter-name>
    <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
    <init-param>
      <param-name>encoding</param-name>
      <param-value>UTF-8</param-value>
    </init-param>
    <init-param>
      <param-name>forceEncoding</param-name>
      <param-value>true</param-value>
    </init-param>
  </filter>
  <filter-mapping>
    <filter-name>characterEncoding</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>
  <!-- Spring 編碼過濾器 End -->

  <!-- Spring Application Context Listener Start -->
  <context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>classpath:applicationContext.xml</param-value>
  </context-param>
  <listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>
  <!-- Spring Application Context Listener End -->


  <!-- Spring MVC Config Start -->
  <servlet>
    <servlet-name>SpringMVC</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:spring-mvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
  </servlet>
  <servlet-mapping>
    <servlet-name>SpringMVC</servlet-name>
    <!-- Filter all resources -->
    <url-pattern>/</url-pattern>
  </servlet-mapping>
  <!-- Spring MVC Config End -->

</web-app>           

3、spring配置檔案:applicationContext.xml:

注意:與spring整合的時候要加上命名空間:

xmlns:amq="http://activemq.apache.org/schema/core"

xmlns:jms=

http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xs

d"

JMS對象模型的連接配接工廠、連接配接、消費者、生産者等可以通過配置檔案注入spring容器。

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 通路 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
     
     <!-- 配置掃描路徑 -->
     <context:component-scan base-package="com.dongnaoedu">
         <context:exclude-filter type="annotation"
              expression="org.springframework.stereotype.Controller"/>
     </context:component-scan>

    <!-- ActiveMQ 連接配接工廠 -->
    <amq:connectionFactory id="amqConnectionFactory"
             brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />

    <!-- Spring Caching連接配接工廠 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connection"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <property name="sessionCacheSize" value="100"></property>
    </bean>


    <!-- Spring JmsTemplate 的消息生産者 start-->
    <!-- 定義JmsTemplate的Queue類型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connection"></constructor-arg>
        <!-- 隊列模式-->
        <property name="pubSubDomain" value="false"></property>
    </bean>


    <!-- 定義JmsTemplate的Topic類型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connection"></constructor-arg>
        <!-- true:釋出訂閱模式; false:隊列模式-->
        <property name="pubSubDomain" value="true"></property>
    </bean>

    <!--Spring JmsTemplate 的消息生産者 end-->

    <!--接收消費者應答的監聽器-->
    <!--
        消息監聽器。如果注冊了消息監聽器,一旦消息到達,将自動調用監聽器的onMessage方法。
        EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
    -->
    <jms:listener-container destination-type="queue" container-type="default"
                            connection-factory="connection" acknowledge="auto">
        <jms:listener destination="tempqueue" ref="getResponse"></jms:listener>
    </jms:listener-container>
</beans>             

4、做成網頁實戰,需要配置springMVC;spring-MVC.xml

<?xml version="1.0" encoding="UTF-8"?>  
<!-- 查找最新的schemaLocation 通路 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"   
       xmlns:aop="http://www.springframework.org/schema/aop"   
       xmlns:context="http://www.springframework.org/schema/context"  
       xmlns:mvc="http://www.springframework.org/schema/mvc"   
       xmlns:tx="http://www.springframework.org/schema/tx"   
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
       xsi:schemaLocation="http://www.springframework.org/schema/aop   
        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd   
        http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd   
        http://www.springframework.org/schema/mvc   
        http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd   
        http://www.springframework.org/schema/tx   
        http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">

<!--    <mvc:default-servlet-handler />-->
    <mvc:resources mapping="/js/**" location="/js/"/>
    <mvc:annotation-driven
            content-negotiation-manager="contentNegotiationManager" />

    <context:component-scan base-package="com.dongnaoedu">
        <context:include-filter type="annotation"
                                expression="org.springframework.stereotype.Controller" />
    </context:component-scan>


    <bean id="stringHttpMessageConverter"
          class="org.springframework.http.converter.StringHttpMessageConverter">
        <property name="supportedMediaTypes">
            <list>
                <bean class="org.springframework.http.MediaType">
                    <constructor-arg index="0" value="text" />
                    <constructor-arg index="1" value="plain" />
                    <constructor-arg index="2" value="UTF-8" />
                </bean>
            </list>
        </property>
    </bean>
    <bean id="mappingJackson2HttpMessageConverter"
          class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />

    <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
        <property name="messageConverters">
            <list>
                <ref bean="stringHttpMessageConverter" />
                <ref bean="mappingJackson2HttpMessageConverter" />
            </list>
        </property>
    </bean>

    <bean id="contentNegotiationManager"
          class="org.springframework.web.accept.ContentNegotiationManagerFactoryBean">
        <property name="mediaTypes">
            <map>
                <entry key="html" value="text/html" />
                <entry key="pdf" value="application/pdf" />
                <entry key="xsl" value="application/vnd.ms-excel" />
                <entry key="xml" value="application/xml" />
                <entry key="json" value="application/json" />
            </map>
        </property>
        <property name="defaultContentType" value="text/html" />
    </bean>

    <bean id="viewResolver"
          class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver">
        <property name="order" value="0" />
        <property name="contentNegotiationManager" ref="contentNegotiationManager" />

        <property name="viewResolvers">
            <list>
                <bean class="org.springframework.web.servlet.view.BeanNameViewResolver" />
                <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
                    <property name="viewClass"
                              value="org.springframework.web.servlet.view.JstlView" />
                    <property name="prefix" value="/WEB-INF/pages/" />
                    <property name="suffix" value=".jsp"></property>
                </bean>
            </list>
        </property>

        <property name="defaultViews">
            <list>
                <bean  class="org.springframework.web.servlet.view.json.MappingJackson2JsonView">
                    <property name="extractValueFromSingleKeyModel" value="true" />
                </bean>
            </list>
        </property>
    </bean>
    
</beans>             

5、監聽接口:

package com.dongnaoedu.mq.producer.queue;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 動腦學院-Mark老師
 * 建立日期:2017/11/02
 * 建立時間: 22:19
 * 接收消費者應答的類
 */
@Component
public class GetResponse implements MessageListener {

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("GetResponse accept response : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
           

6、釋出模式的簡單消息生産者:

package com.dongnaoedu.mq.producer.topic;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * 
 * @author lgy
 * @description   Topic生産者發送消息到Topic
 * 
 */

@Component("topicSender")
public class TopicSender {

    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;

    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage(message);
                return msg;
            }
        });
    }
}
           

7、點對點模式消息生産者:

package com.dongnaoedu.mq.producer.queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.*;
import java.awt.peer.SystemTrayPeer;

/**
 * 
 * @author lgy
 * @description  隊列消息生産者,發送消息到隊列
 * 
 */
@Component("queueSender")
public class QueueSender {

    @Autowired
    @Qualifier("jmsQueueTemplate")//在xml配置好的隊列模式
    private JmsTemplate jmsTemplate;

    @Autowired
    private GetResponse getResponse;

    public void send(String queueName,final String message){
        //jmsTemplate的send方法傳遞兩個參數,第一個傳隊列名,第二個傳消息建立接口的實作對象
        jmsTemplate.send(queueName, new MessageCreator() {
            //消息建立接口隻有這麼一個方法,架構會把session會話對象傳給方法,此方法需要傳回一個Message消息對象,架構會發送消息到隊列
            public Message createMessage(Session session) throws JMSException {
                //建立會話消息
                Message msg = session.createTextMessage(message);
                //配置消費者應答相關内容
                //擷取臨時的消息隊列
                Destination tempDest = session.createTemporaryQueue();
                MessageConsumer responseConsumer = session.createConsumer(tempDest);//擷取應答的消息消費者
                responseConsumer.setMessageListener(getResponse);//設定消費者傳回消息時的監聽接口
                msg.setJMSReplyTo(tempDest);//告訴消費者應答消息發送到臨時隊列
                //消費者應答的id,發送出的消息和應答消息進行比對
                String uid = System.currentTimeMillis()+"";
                msg.setJMSCorrelationID(uid);

                return msg;
            }
        });
    }
}           

8、controller:

package com.dongnaoedu.controller;

import com.dongnaoedu.mq.producer.queue.QueueSender;
import com.dongnaoedu.mq.producer.topic.TopicSender;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;


/**
 * 
 * @author lgy
 * @description controller測試
 */
@Controller
@RequestMapping("/activemq")
public class ActivemqController {
    
    @Resource
    QueueSender queueSender;
    @Resource
    TopicSender topicSender;
    
    /**
     * 發送消息到隊列
     * Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中
     * @param message
     * @return String
     */
    @ResponseBody
    @RequestMapping("queueSender")
    public String queueSender(@RequestParam("message")String message){
        String opt="";
        try {
            queueSender.send("test.queue",message);
            opt = "suc";
        } catch (Exception e) {
            opt = e.getCause().toString();
        }
        return opt;
    }
    
    /**
     * 發送消息到主題
     * Topic主題 :放入一個消息,所有訂閱者都會收到 
     * 這個是主題目的地是一對多的
     * @param message
     * @return String
     */
    @ResponseBody
    @RequestMapping("topicSender")
    public String topicSender(@RequestParam("message")String message){
        String opt = "";
        try {
            topicSender.send("test.topic",message);
            opt = "suc";
        } catch (Exception e) {
            opt = e.getCause().toString();
        }
        return opt;
    }
    
}
           
消息中間件系列三、JMS和activeMQ的簡單使用二、activeMQ的簡單使用三、activeMQ與spring整合實戰

調用不同的接口即可釋出消息到對應的隊列中。

消息消費者項目

項目的pom檔案和web.xml以及spring-mvc.xml基本和生産者一樣,且不是重點,這裡不再浪費篇幅,就不貼出來了。

1、applicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 通路 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
     
     <!-- 配置掃描路徑 -->
     <context:component-scan base-package="com.dongnaoedu">
         <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
     </context:component-scan>

    <!-- ActiveMQ 連接配接工廠 -->
    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin"  />

    <!-- Spring Caching連接配接工廠 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 消息消費者 start-->

    <!-- 定義Queue監聽器 -->
    <jms:listener-container destination-type="queue" container-type="default"
                            connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
        <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
    </jms:listener-container>


    <!-- 定義Topic監聽器 -->
    <jms:listener-container destination-type="topic" container-type="default"
                            connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
        <jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
    </jms:listener-container>

    <!-- 消息消費者 end -->

    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory"></constructor-arg>
        <!-- 隊列模式-->
        <property name="pubSubDomain" value="false"></property>
    </bean>

</beans>           

2、釋出訂閱模式的消息監聽:

package com.dongnaoedu.mq.consumer.topic;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 
 * @author lgy
 * @description  Topic消息監聽器
 * 
 */
@Component
public class TopicReceiver2 implements MessageListener {

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("TopicReceiver2 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}
           

可以同時設定多個監聽,有消息釋出時,多個監聽器都能監聽到消息

package com.dongnaoedu.mq.consumer.topic;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 
 * @author lgy
 * @description  Topic消息監聽器
 * 
 */
@Component
public class TopicReceiver1 implements MessageListener {


    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("TopicReceiver1 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
}
           

在消息生産者項目的前端界面輸入消息内容:“123456”,在點選發送Topic消息,以釋出訂閱模式釋出消息;消費項目這邊的兩個釋出訂閱模式監聽器都能收到消息并列印出來:

消息中間件系列三、JMS和activeMQ的簡單使用二、activeMQ的簡單使用三、activeMQ與spring整合實戰

3、點對點模式的消息監聽

package com.dongnaoedu.mq.consumer.queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * 
 * @author Mark
 * @description  隊列消息監聽器
 * 
 */
@Component
public class QueueReceiver1 implements MessageListener {

    @Autowired
    private ReplyTo replyTo;

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("QueueReceiver1 accept msg : "+textMsg);
            //do my 業務工作
            replyTo.send(textMsg,message);//執行回複
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}           

可以給消息生産者回複消息:

package com.dongnaoedu.mq.consumer.queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * 負責向消息提供者發送應答資訊
 */
@Component
public class ReplyTo {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void send(final String consumerMsg, Message produerMessage) throws JMSException {
        //消息回複,getJMSReplyTo回去回複的臨時隊列
        jmsTemplate.send(produerMessage.getJMSReplyTo(), new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage("QueueReceiver1 accept msg"
                        +consumerMsg);
                return msg;
            }
        });

    }

}           

可以有多個點對點隊列監聽器,當同一隊列有多個監聽器時,會輪詢給監聽器發消息

package com.dongnaoedu.mq.consumer.queue;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 
 * @author Mark
 * @description  隊列消息監聽器
 * 
 */
@Component
public class QueueReceiver2 implements MessageListener {

    public void onMessage(Message message) {
        try {
            String textMsg = ((TextMessage)message).getText();
            System.out.println("QueueReceiver2 accept msg : "+textMsg);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}           

在生産項目連續釋出6條點對點消息,消費項目這邊接收到的消息列印結果為:

消息中間件系列三、JMS和activeMQ的簡單使用二、activeMQ的簡單使用三、activeMQ與spring整合實戰

上圖是兩個監聽器輪詢接收到消息并列印出來的結果。