一、簡介:JMS即 Java消息服務 (Java Message Service)應用程式接口,是一個 Java平台 中關于面向 消息中間件 (MOM)的 API ,用于在兩個應用程式之間,或 分布式系統
中發送消息,進行異步通信。Java消息服務是一個與具體平台無關的API,絕大多數MOM提供商都對JMS提供支援。
二、JMS對象模型包含如下幾個要素:
1)連接配接工廠。連接配接工廠(ConnectionFactory)是由管理者建立,并綁定到
JNDI樹中。用戶端使用JNDI查找連接配接工廠,然後利用連接配接工廠建立一個JMS連接配接。
2)JMS連接配接。JMS連接配接(Connection)表示JMS用戶端和伺服器端之間的一個活動的連接配接,是由用戶端通過調用連接配接工廠的方法建立的。
3)JMS會話。JMS會話(Session)表示JMS客戶與JMS伺服器之間的會話狀态。JMS會話建立在JMS連接配接上,表示客戶與伺服器之間的一個會話線程。
4)JMS目的。JMS目的(Destination),又稱為
消息隊列,是實際的消息源。
5)JMS生産者和消費者。生産者(Message Producer)和消費者(Message Consumer)對象由Session對象建立,用于發送和接收消息。
6)JMS消息通常有兩種類型:
① 點對點(Point-to-Point)。在點對點的消息系統中,消息分發給一個單獨的使用者。點對點消息往往與隊列(javax.jms.Queue)相關聯。
② 釋出/訂閱(Publish/Subscribe)。釋出/訂閱消息系統支援一個事件驅動模型,消息生産者和消費者都參與消息的傳遞。生産者釋出事件,而使用者訂閱感興趣的事件,并使用事件。該類型消息一般與特定的主題(javax.jms.Topic)關聯。
三、JMS的五種消息
StreamMessage -- Java原始值的資料流
MapMessage--一套名稱-值對
TextMessage--一個字元串對象
ObjectMessage--一個序列化的 Java對象
BytesMessage--一個未解釋位元組的資料流
四、常用應用類
1)ConnectionFactory 接口(連接配接工廠):使用者用來建立到JMS提供者的連接配接的被管對象。JMS客戶通過可移植的接口通路連接配接,這樣當下層的實作改變時,代碼不需要進行修改。管理者在JNDI名字空間中配置連接配接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,使用者将使用隊列連接配接工廠,或者主題連接配接工廠。
2)Connection 接口(連接配接):連接配接代表了應用程式和
消息伺服器之間的通信鍊路。在獲得了連接配接工廠後,就可以建立一個與JMS提供者的連接配接。根據不同的連接配接類型,連接配接允許使用者建立會話,以發送和接收隊列和主題到目标。
3)Destination 接口(目标):目标是一個包裝了消息目标
辨別符的被管對象,消息目标是指消息釋出和接收的地點,或者是隊列,或者是主題。JMS管理者建立這些對象,然後使用者通過JNDI發現它們。和連接配接工廠一樣,管理者可以建立兩種類型的目标,點對點模型的隊列,以及釋出者/訂閱者模型的主題。
4)Session 接口(會話):表示一個單線程的上下文,用于發送和接收消息。由于會話是單線程的,是以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支援
事務。如果使用者選擇了事務支援,會話上下文将儲存一組消息,直到事務被送出才發送這些消息。在送出事務之前,使用者可以使用復原操作取消這些消息。一個會話允許使用者建立消息,生産者來發送消息,消費者來接收消息。
5)MessageConsumer 接口(消息消費者):由會話建立的對象,用于接收發送到目标的消息。消費者可以同步地(阻塞模式),或(非阻塞)接收隊列和主題類型的消息。
6)MessageProducer 接口(消息生産者):由會話建立的對象,用于發送消息到目标。使用者可以建立某個目标的發送者,也可以建立一個通用的發送者,在發送消息時指定目标。
7)Message 接口(消息):是在消費者和生産者之間傳送的對象,也就是說從一個應用程式傳送到另一個應用程式。一個消息有三個主要部分:
a、消息頭(必須):包含用于識别和為消息尋找路由的操作設定。
b、一組消息屬性(可選):包含額外的屬性,支援其他提供者和使用者的相容。可以建立定制的字段和過濾器(消息選擇器)。
c、一個消息體(可選):允許使用者建立五種類型的消息(文本消息,映射消息,位元組消息,流消息和對象消息)。
d、消息接口非常靈活,并提供了許多方式來定制消息的内容。
五、這裡主要講解activemq的實作topic和queue的消息傳輸機制
1)首先還是需要中間件(不管是單獨應用部署,還是嵌入式的服務)
服務啟動可以參考:
https://www.cnblogs.com/ll409546297/p/9190992.html 2)activemq工廠連接配接模式 //初始化工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
//設定連結位址
activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");
//建立連結
Connection connection = activeMQConnectionFactory.createConnection();
//啟動服務
connection.start();
3)我這裡簡單寫了一個activemq的具體實作過程,供參考
a、目錄結構和依賴包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.4</version>
</dependency>
b、核心功能,消息進行中心
package com.pinnet.center;
import com.pinnet.consumer.queue.Queue;
import com.pinnet.consumer.topic.Topic1;
import com.pinnet.consumer.topic.Topic2;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQObjectMessage;
import javax.jms.*;
import java.io.Serializable;
public class MessageCenter {
private static Session session;
public static void init() throws JMSException {
initConnection();
initRegister();
}
private static void initConnection() throws JMSException {
//初始化工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
//設定連結位址
activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");
//建立連結
Connection connection = activeMQConnectionFactory.createConnection();
//建立會話
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//啟動服務
connection.start();
}
/**
* 用于注冊對應消息隊列
* @throws JMSException
*/
public static void initRegister() throws JMSException {
registerQueue(QueueType.QUEUE, new Queue());
registerTopic(TopicType.TOPIC, new Topic1());
registerTopic(TopicType.TOPIC, new Topic2());
}
/**
* 注冊topic監聽
* @param topicType
* @param messageListener
* @throws JMSException
*/
public static void registerTopic(TopicType topicType, MessageListener messageListener) throws JMSException {
//将會話轉成topic
TopicSession topicSession = (TopicSession) session;
//建立訂閱者
TopicSubscriber topicSubscriber = topicSession.createSubscriber(topicSession.createTopic(topicType.name()));
//設定監聽
topicSubscriber.setMessageListener(messageListener);
}
/**
* 注冊queue監聽
* @param queueType
* @param messageListener
* @throws JMSException
*/
public static void registerQueue(QueueType queueType, MessageListener messageListener) throws JMSException {
//将會話轉成queue
QueueSession queueSession = (QueueSession) session;
//建立接收者
QueueReceiver queueReceiver = queueSession.createReceiver(queueSession.createQueue(queueType.name()));
//設定監聽
queueReceiver.setMessageListener(messageListener);
}
/**
* 發送topic消息
* @param topicType
* @param serializable
* @throws JMSException
*/
public static void sendMessageTopic(TopicType topicType, Serializable serializable) throws JMSException {
TopicSession topicSession = (TopicSession) session;
//建立釋出者
TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic(topicType.name()));
//這裡資料釋出形式采用ObjectMessage
ActiveMQObjectMessage activeMQObjectMessage = new ActiveMQObjectMessage();
activeMQObjectMessage.setObject(serializable);
//釋出消息
topicPublisher.publish(activeMQObjectMessage);
}
/**
* 發送queue消息
* @param queueType
* @param serializable
* @throws JMSException
*/
public static void sendMessageQueue(QueueType queueType, Serializable serializable) throws JMSException {
QueueSession queueSession = (QueueSession) session;
//建立發送者
QueueSender queueSender = queueSession.createSender(queueSession.createQueue(queueType.name()));
ActiveMQObjectMessage activeMQObjectMessage = new ActiveMQObjectMessage();
activeMQObjectMessage.setObject(serializable);
//發送消息
queueSender.send(activeMQObjectMessage);
}
//使用枚舉的目的是更好的管理
public enum TopicType {
TOPIC
}
public enum QueueType {
QUEUE
}
}
c、消息處理抽象類
package com.pinnet.consumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import java.io.Serializable;
/**
* 抽象類的目的是,讓編寫者,隻管處理消息,不用管理中間過程
*/
public abstract class MessageConsumer implements MessageListener {
public void onMessage(Message message) {
try {
ObjectMessage objectMessage = (ObjectMessage) message;
handleMessage(objectMessage.getObject());
} catch (JMSException e) {
e.printStackTrace();
}
}
//實作類處理對應消息
public abstract void handleMessage (Serializable serializable);
}
d、2個實作類型
package com.pinnet.consumer.queue;
import com.pinnet.consumer.MessageConsumer;
import java.io.Serializable;
public class Queue extends MessageConsumer {
/**
* 消息處理
* @param serializable
*/
public void handleMessage(Serializable serializable) {
System.out.println(serializable+"queue");
}
}
package com.pinnet.consumer.topic;
import com.pinnet.consumer.MessageConsumer;
import java.io.Serializable;
public class Topic1 extends MessageConsumer {
/**
* 消息處理
* @param serializable
*/
public void handleMessage(Serializable serializable) {
System.out.println(serializable+"topic1");
}
}
package com.pinnet.consumer.topic;
import com.pinnet.consumer.MessageConsumer;
import java.io.Serializable;
public class Topic2 extends MessageConsumer {
/**
* 消息處理
* @param serializable
*/
public void handleMessage(Serializable serializable) {
System.out.println(serializable+"topic2");
}
}
e、測試:
package com.pinnet;
import com.pinnet.center.MessageCenter;
import javax.jms.JMSException;
public class Main {
public static void main(String[] args) throws JMSException, InterruptedException {
MessageCenter.init();
while (true) {
MessageCenter.sendMessageQueue(MessageCenter.QueueType.QUEUE, "queue");
MessageCenter.sendMessageTopic(MessageCenter.TopicType.TOPIC, "topic");
Thread.sleep(5000);
}
}
}
六、我這裡是采用純代碼的方式,在spring中jmsTemplate的應用,就像session一樣,就可以。
springmvc的配置方式參考:
https://www.cnblogs.com/ll409546297/p/6898155.html配置方式也可以采用activemq的工廠連接配接模式一樣實作
七、例子源碼:
https://pan.baidu.com/s/1Wo0QuihBMluRevGmk-p-aA