概述
是Apache下的一個子項目。 類似于ZeroMQ,它能夠以代理人和點對點的技術實作隊列。同時類似于RabbitMQ,它少量代碼就可以高效地實作進階應用場景。RabbitMQ、ZeroMQ、ActiveMQ均支援常用的多種語言用戶端 C++、Java、.Net,、Python、 Php、 Ruby等。
ActiveMQ安裝
1、下載下傳ActiveMQ
去官方網站下載下傳:http://activemq.apache.org/activemq-5152-release.html。
2、運作ActiveMQ
下載下傳好後,解壓壓縮包到一個目錄。進入bin目錄,發現有win32和win64兩個檔案夾,這2個檔案夾分别對應windows32位和windows64位作業系統的啟動腳本。根據自己的系統,選擇合适的路徑,其中activemq.bat便是啟動腳本,輕按兩下啟動。
3、登入ActiveMQ
ActiveMQ預設啟動到8161端口,啟動完了後在浏覽器位址欄輸入:http://localhost:8161/admin要求輸入使用者名密碼,預設使用者名密碼為admin、admin,這個使用者名密碼是在conf/users.properties中配置的。輸入使用者名密碼後便可看到如下圖的ActiveMQ控制台界面了。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL90ERPJTU610dRRVT3V1MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLzMTM0UjNwcTMwIDNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
JMS可靠消息機制
消息确認機制
JMS消息隻有在被确認之後,才認為已經被成功的消費了,消息的成功消費通常包含三個階段:客戶接收消息,客戶處理消息和消息被确認。
在事務性會話中,當一個事務被送出的時候,确認自動發生。在非事務性會話中,消息何時被确認取決于建立會話時的應答模式。改參數有三個可選值:
① Session.AUTO_ACKNOWLEDGE:當客戶成功的從receive方法傳回的時候,或者從MessageListener.onMessage方法成功傳回的時候,會話自動确認客戶收到的消息。
②Session.CLIENT_ACKNOWLEDGE:客戶通過調用消息的acknowledge方法确認消息。需要注意的是,在這種模式中,确認是在會話層上進行,确認一個被消費的消息,将自動确認所有已被會話消費的消息。例如,如果一個消息消費者消費了10個消息,然後确認第5個消息,那麼所有10個消息都被确認。
③Session.DUPS_ACKNOWLEDGE:該選擇隻是會話遲鈍的确認消息的送出。如果JMS Provider失敗,那麼可能會導緻一些重複的消息。如果是重複的消息,那麼JMS provider必須把消息頭的JMSRedelivered字段設定為true
持久話機制
PERSISTENT:訓示JMS provider持久儲存消息,以保證消息不會因為JMS provider的失敗而丢失
NON_PERSISTENT:不要求JMS provider持久儲存消息
// 設定消息持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
實作點對點通訊
使用ActiveMQ完成點對點(p2p)通訊模式
生産者
public class Produce{
// mq通訊位址
private static String url = "tcp://127.0.0.1:61616";
// 隊列名稱
private static String queueName = "my_queue";
public static void main(String[] args) throws JMSException {
// 1.建立連接配接工廠 嗎,密碼采用預設密碼admin 和admin
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
// 2.建立連接配接
Connection connection = factory.createConnection();
// 3.建立會話 參數1 設定是否需要以事務方式送出 參數2 消息方式 采用自動簽收
connection.start();// 啟動連接配接
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.建立目标(隊列)
Queue queue = session.createQueue(queueName);
// 5.建立生産者
MessageProducer producer = session.createProducer(queue);
for (int i = 1; i <= 10; i++) {
// 6.建立 消息
TextMessage textMessage = session.createTextMessage("消息内容i:" + i);
// 7.發送消息
producer.send(textMessage);
}
// 8.關閉連接配接
connection.close();
System.out.println("消息發送完畢!");
}
}
檢視ActiveMQ
消費者
public class Consumer {
// mq通訊位址
private static String url = "tcp://127.0.0.1:61616";
// 隊列名稱
private static String queueName = "my_queue";
public static void main(String[] args) throws JMSException {
System.out.println("我是消費者002");
// 1.建立連接配接工廠 嗎,密碼采用預設密碼admin 和admin
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
// 2.建立連接配接
Connection connection = factory.createConnection();
// 3.建立會話 參數1 設定是否需要以事務方式送出 參數2 消息方式 采用自動簽收
connection.start();// 啟動連接配接
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.建立目标(隊列)
Queue queue = session.createQueue(queueName);
// 5.建立消費者
MessageConsumer consumer = session.createConsumer(queue);
// 6.啟動監聽 監聽消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("消費者消息生産者内容:" + textMessage.getText());
} catch (Exception e) {
// TODO: handle exception
}
}
});
// 不要關閉連接配接
}
}
注意:點對點方式中,消費者如果叢集,預設采用均攤方式消費,但是還是確定一條消息隻有一個消費者消費。
實作釋出訂閱
要先啟動消費者,訂閱主題,再啟動生産者,才能接收到消息。
這裡我們采用SpringBoot整合ActiveMQ。
生産者
1、pom檔案引入
<!-- 管理依賴 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.M7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- SpringBoot整合Web元件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- SpringBoot Activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
2、配置檔案
spring:
activemq:
###MQ連接配接通訊位址
broker-url: tcp://127.0.0.1:61616
###賬号
user: admin
###密碼
password: admin
###自定義主題
topic: my-topic
server:
port: 8082
3、定義主題
@Component
public class TopicConfig {
@Value("${topic}")
private String topicName;
@Bean
public Topic topic() {
return new ActiveMQTopic(topicName);
}
}
4、定義生産者
@Component
public class TopicProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Scheduled(fixedDelay = 5000)
public void send() {
String msg = System.currentTimeMillis() + "";
System.out.println("采用釋出訂閱方式,生産者向消費者發送内容:" + msg);
jmsMessagingTemplate.convertAndSend(topic, msg);
}
}
5、啟動項
@SpringBootApplication
@EnableScheduling
public class AppProducer {
public static void main(String[] args) {
SpringApplication.run(AppProducer.class, args);
}
}
消費者
1、pom檔案引入
<!-- 管理依賴 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.M7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- SpringBoot整合Web元件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- SpringBoot Activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
2、配置檔案
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
#### 開啟釋出訂閱
jms:
pub-sub-domain: true
topic: spring-topic
server:
port: 8082
3、定義消費者
@Component
public class TopicConsumer {
@JmsListener(destination = "spring-topic")
public void receive(String msg) {
System.out.println("釋出與訂閱消費者接受,生産者内容:" + msg);
}
}
啟動項
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}