天天看點

java之JMS

  一、簡介:JMS即 Java消息服務 (Java Message Service)應用程式接口,是一個 Java平台 中關于面向 消息中間件 (MOM)的 API ,用于在兩個應用程式之間,或 分布式系統

中發送消息,進行異步通信。Java消息服務是一個與具體平台無關的API,絕大多數MOM提供商都對JMS提供支援。

  二、JMS對象模型包含如下幾個要素: 

  1)連接配接工廠。連接配接工廠(ConnectionFactory)是由管理者建立,并綁定到

JNDI

樹中。用戶端使用JNDI查找連接配接工廠,然後利用連接配接工廠建立一個JMS連接配接。

  2)JMS連接配接。JMS連接配接(Connection)表示JMS用戶端和伺服器端之間的一個活動的連接配接,是由用戶端通過調用連接配接工廠的方法建立的。

  3)JMS會話。JMS會話(Session)表示JMS客戶與JMS伺服器之間的會話狀态。JMS會話建立在JMS連接配接上,表示客戶與伺服器之間的一個會話線程。

  4)JMS目的。JMS目的(Destination),又稱為

消息隊列

,是實際的消息源。

  5)JMS生産者和消費者。生産者(Message Producer)和消費者(Message Consumer)對象由Session對象建立,用于發送和接收消息。

  6)JMS消息通常有兩種類型:

    ① 點對點(Point-to-Point)。在點對點的消息系統中,消息分發給一個單獨的使用者。點對點消息往往與隊列(javax.jms.Queue)相關聯。

    ② 釋出/訂閱(Publish/Subscribe)。釋出/訂閱消息系統支援一個事件驅動模型,消息生産者和消費者都參與消息的傳遞。生産者釋出事件,而使用者訂閱感興趣的事件,并使用事件。該類型消息一般與特定的主題(javax.jms.Topic)關聯。

  三、JMS的五種消息

  StreamMessage -- Java原始值的資料流

  MapMessage--一套名稱-值對

  TextMessage--一個字元串對象

  ObjectMessage--一個序列化的 Java對象

  BytesMessage--一個未解釋位元組的資料流

  四、常用應用類

  1)ConnectionFactory 接口(連接配接工廠):使用者用來建立到JMS提供者的連接配接的被管對象。JMS客戶通過可移植的接口通路連接配接,這樣當下層的實作改變時,代碼不需要進行修改。管理者在JNDI名字空間中配置連接配接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,使用者将使用隊列連接配接工廠,或者主題連接配接工廠。

  2)Connection 接口(連接配接):連接配接代表了應用程式和

消息伺服器

之間的通信鍊路。在獲得了連接配接工廠後,就可以建立一個與JMS提供者的連接配接。根據不同的連接配接類型,連接配接允許使用者建立會話,以發送和接收隊列和主題到目标。

  3)Destination 接口(目标):目标是一個包裝了消息目标

辨別符

的被管對象,消息目标是指消息釋出和接收的地點,或者是隊列,或者是主題。JMS管理者建立這些對象,然後使用者通過JNDI發現它們。和連接配接工廠一樣,管理者可以建立兩種類型的目标,點對點模型的隊列,以及釋出者/訂閱者模型的主題。

  4)Session 接口(會話):表示一個單線程的上下文,用于發送和接收消息。由于會話是單線程的,是以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支援

事務

。如果使用者選擇了事務支援,會話上下文将儲存一組消息,直到事務被送出才發送這些消息。在送出事務之前,使用者可以使用復原操作取消這些消息。一個會話允許使用者建立消息,生産者來發送消息,消費者來接收消息。

  5)MessageConsumer 接口(消息消費者):由會話建立的對象,用于接收發送到目标的消息。消費者可以同步地(阻塞模式),或(非阻塞)接收隊列和主題類型的消息。

  6)MessageProducer 接口(消息生産者):由會話建立的對象,用于發送消息到目标。使用者可以建立某個目标的發送者,也可以建立一個通用的發送者,在發送消息時指定目标。

  7)Message 接口(消息):是在消費者和生産者之間傳送的對象,也就是說從一個應用程式傳送到另一個應用程式。一個消息有三個主要部分:

    a、消息頭(必須):包含用于識别和為消息尋找路由的操作設定。

    b、一組消息屬性(可選):包含額外的屬性,支援其他提供者和使用者的相容。可以建立定制的字段和過濾器(消息選擇器)。

    c、一個消息體(可選):允許使用者建立五種類型的消息(文本消息,映射消息,位元組消息,流消息和對象消息)。

    d、消息接口非常靈活,并提供了許多方式來定制消息的内容。

  五、這裡主要講解activemq的實作topic和queue的消息傳輸機制

  1)首先還是需要中間件(不管是單獨應用部署,還是嵌入式的服務)

    服務啟動可以參考:

https://www.cnblogs.com/ll409546297/p/9190992.html   2)activemq工廠連接配接模式 

     //初始化工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        //設定連結位址
        activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");
        //建立連結
        Connection connection = activeMQConnectionFactory.createConnection();
        //啟動服務
        connection.start();      

  3)我這裡簡單寫了一個activemq的具體實作過程,供參考

  a、目錄結構和依賴包

java之JMS
    <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.4</version>
        </dependency>      

  b、核心功能,消息進行中心

package com.pinnet.center;

import com.pinnet.consumer.queue.Queue;
import com.pinnet.consumer.topic.Topic1;
import com.pinnet.consumer.topic.Topic2;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQObjectMessage;

import javax.jms.*;
import java.io.Serializable;

public class MessageCenter {

    private static Session session;

    public static void init() throws JMSException {
        initConnection();
        initRegister();
    }

    private static void initConnection() throws JMSException {
        //初始化工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        //設定連結位址
        activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");
        //建立連結
        Connection connection = activeMQConnectionFactory.createConnection();
        //建立會話
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //啟動服務
        connection.start();
    }

    /**
     * 用于注冊對應消息隊列
     * @throws JMSException
     */
    public static void initRegister() throws JMSException {
        registerQueue(QueueType.QUEUE, new Queue());
        registerTopic(TopicType.TOPIC, new Topic1());
        registerTopic(TopicType.TOPIC, new Topic2());
    }

    /**
     * 注冊topic監聽
     * @param topicType
     * @param messageListener
     * @throws JMSException
     */
    public static void registerTopic(TopicType topicType, MessageListener messageListener) throws JMSException {
        //将會話轉成topic
        TopicSession topicSession = (TopicSession) session;
        //建立訂閱者
        TopicSubscriber topicSubscriber = topicSession.createSubscriber(topicSession.createTopic(topicType.name()));
        //設定監聽
        topicSubscriber.setMessageListener(messageListener);
    }

    /**
     * 注冊queue監聽
     * @param queueType
     * @param messageListener
     * @throws JMSException
     */
    public static void registerQueue(QueueType queueType, MessageListener messageListener) throws JMSException {
        //将會話轉成queue
        QueueSession queueSession = (QueueSession) session;
        //建立接收者
        QueueReceiver queueReceiver = queueSession.createReceiver(queueSession.createQueue(queueType.name()));
        //設定監聽
        queueReceiver.setMessageListener(messageListener);
    }

    /**
     * 發送topic消息
     * @param topicType
     * @param serializable
     * @throws JMSException
     */
    public static void sendMessageTopic(TopicType topicType, Serializable serializable) throws JMSException {
        TopicSession topicSession = (TopicSession) session;
        //建立釋出者
        TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic(topicType.name()));
        //這裡資料釋出形式采用ObjectMessage
        ActiveMQObjectMessage activeMQObjectMessage = new ActiveMQObjectMessage();
        activeMQObjectMessage.setObject(serializable);
        //釋出消息
        topicPublisher.publish(activeMQObjectMessage);
    }

    /**
     * 發送queue消息
     * @param queueType
     * @param serializable
     * @throws JMSException
     */
    public static void sendMessageQueue(QueueType queueType, Serializable serializable) throws JMSException {
        QueueSession queueSession = (QueueSession) session;
        //建立發送者
        QueueSender queueSender = queueSession.createSender(queueSession.createQueue(queueType.name()));
        ActiveMQObjectMessage activeMQObjectMessage = new ActiveMQObjectMessage();
        activeMQObjectMessage.setObject(serializable);
        //發送消息
        queueSender.send(activeMQObjectMessage);
    }

    //使用枚舉的目的是更好的管理
    public enum TopicType {
        TOPIC
    }

    public enum QueueType {
        QUEUE
    }
}      

  c、消息處理抽象類

package com.pinnet.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import java.io.Serializable;

/**
 * 抽象類的目的是,讓編寫者,隻管處理消息,不用管理中間過程
 */
public abstract class MessageConsumer implements MessageListener {

    public void onMessage(Message message) {
        try {
            ObjectMessage objectMessage = (ObjectMessage) message;
            handleMessage(objectMessage.getObject());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    //實作類處理對應消息
    public abstract void handleMessage (Serializable serializable);
}      

  d、2個實作類型

package com.pinnet.consumer.queue;

import com.pinnet.consumer.MessageConsumer;

import java.io.Serializable;

public class Queue extends MessageConsumer {

    /**
     * 消息處理
     * @param serializable
     */
    public void handleMessage(Serializable serializable) {
        System.out.println(serializable+"queue");
    }
}      
package com.pinnet.consumer.topic;

import com.pinnet.consumer.MessageConsumer;

import java.io.Serializable;

public class Topic1 extends MessageConsumer {

    /**
     * 消息處理
     * @param serializable
     */
    public void handleMessage(Serializable serializable) {
        System.out.println(serializable+"topic1");
    }
}      
package com.pinnet.consumer.topic;

import com.pinnet.consumer.MessageConsumer;

import java.io.Serializable;

public class Topic2 extends MessageConsumer {

    /**
     * 消息處理
     * @param serializable
     */
    public void handleMessage(Serializable serializable) {
        System.out.println(serializable+"topic2");
    }
}      

  e、測試:

package com.pinnet;

import com.pinnet.center.MessageCenter;

import javax.jms.JMSException;

public class Main {

    public static void main(String[] args) throws JMSException, InterruptedException {
        MessageCenter.init();
        while (true) {
            MessageCenter.sendMessageQueue(MessageCenter.QueueType.QUEUE, "queue");
            MessageCenter.sendMessageTopic(MessageCenter.TopicType.TOPIC, "topic");
            Thread.sleep(5000);
        }
    }
}      

  

java之JMS

  六、我這裡是采用純代碼的方式,在spring中jmsTemplate的應用,就像session一樣,就可以。

    springmvc的配置方式參考:

https://www.cnblogs.com/ll409546297/p/6898155.html

    配置方式也可以采用activemq的工廠連接配接模式一樣實作

   七、例子源碼:

https://pan.baidu.com/s/1Wo0QuihBMluRevGmk-p-aA