天天看點

ActiveMQ概述ActiveMQ安裝JMS可靠消息機制實作點對點通訊實作釋出訂閱

概述

        是Apache下的一個子項目。 類似于ZeroMQ,它能夠以代理人和點對點的技術實作隊列。同時類似于RabbitMQ,它少量代碼就可以高效地實作進階應用場景。RabbitMQ、ZeroMQ、ActiveMQ均支援常用的多種語言用戶端 C++、Java、.Net,、Python、 Php、 Ruby等。

ActiveMQ安裝

1、下載下傳ActiveMQ

      去官方網站下載下傳:http://activemq.apache.org/activemq-5152-release.html。

2、運作ActiveMQ

     下載下傳好後,解壓壓縮包到一個目錄。進入bin目錄,發現有win32和win64兩個檔案夾,這2個檔案夾分别對應windows32位和windows64位作業系統的啟動腳本。根據自己的系統,選擇合适的路徑,其中activemq.bat便是啟動腳本,輕按兩下啟動。

3、登入ActiveMQ    

       ActiveMQ預設啟動到8161端口,啟動完了後在浏覽器位址欄輸入:http://localhost:8161/admin要求輸入使用者名密碼,預設使用者名密碼為admin、admin,這個使用者名密碼是在conf/users.properties中配置的。輸入使用者名密碼後便可看到如下圖的ActiveMQ控制台界面了。

ActiveMQ概述ActiveMQ安裝JMS可靠消息機制實作點對點通訊實作釋出訂閱

JMS可靠消息機制

消息确認機制

       JMS消息隻有在被确認之後,才認為已經被成功的消費了,消息的成功消費通常包含三個階段:客戶接收消息,客戶處理消息和消息被确認。

       在事務性會話中,當一個事務被送出的時候,确認自動發生。在非事務性會話中,消息何時被确認取決于建立會話時的應答模式。改參數有三個可選值:

   ① Session.AUTO_ACKNOWLEDGE:當客戶成功的從receive方法傳回的時候,或者從MessageListener.onMessage方法成功傳回的時候,會話自動确認客戶收到的消息。

   ②Session.CLIENT_ACKNOWLEDGE:客戶通過調用消息的acknowledge方法确認消息。需要注意的是,在這種模式中,确認是在會話層上進行,确認一個被消費的消息,将自動确認所有已被會話消費的消息。例如,如果一個消息消費者消費了10個消息,然後确認第5個消息,那麼所有10個消息都被确認。

   ③Session.DUPS_ACKNOWLEDGE:該選擇隻是會話遲鈍的确認消息的送出。如果JMS Provider失敗,那麼可能會導緻一些重複的消息。如果是重複的消息,那麼JMS provider必須把消息頭的JMSRedelivered字段設定為true

持久話機制

PERSISTENT:訓示JMS provider持久儲存消息,以保證消息不會因為JMS provider的失敗而丢失

NON_PERSISTENT:不要求JMS provider持久儲存消息

// 設定消息持久化

producer.setDeliveryMode(DeliveryMode.PERSISTENT);
           

實作點對點通訊

使用ActiveMQ完成點對點(p2p)通訊模式

生産者

public class Produce{
	// mq通訊位址
	private static String url = "tcp://127.0.0.1:61616";
	// 隊列名稱
	private static String queueName = "my_queue";

	public static void main(String[] args) throws JMSException {
		// 1.建立連接配接工廠 嗎,密碼采用預設密碼admin 和admin
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
		// 2.建立連接配接
		Connection connection = factory.createConnection();
		// 3.建立會話 參數1 設定是否需要以事務方式送出 參數2 消息方式 采用自動簽收
		connection.start();// 啟動連接配接
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 4.建立目标(隊列)
		Queue queue = session.createQueue(queueName);
		// 5.建立生産者
		MessageProducer producer = session.createProducer(queue);
		for (int i = 1; i <= 10; i++) {
			// 6.建立 消息
			TextMessage textMessage = session.createTextMessage("消息内容i:" + i);
			// 7.發送消息
			producer.send(textMessage);

		}
		// 8.關閉連接配接
		connection.close();
		System.out.println("消息發送完畢!");
	}
}
           

檢視ActiveMQ

ActiveMQ概述ActiveMQ安裝JMS可靠消息機制實作點對點通訊實作釋出訂閱

消費者

public class Consumer {
	// mq通訊位址
	private static String url = "tcp://127.0.0.1:61616";
	// 隊列名稱
	private static String queueName = "my_queue";

	public static void main(String[] args) throws JMSException {
		System.out.println("我是消費者002");
		// 1.建立連接配接工廠 嗎,密碼采用預設密碼admin 和admin
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
		// 2.建立連接配接
		Connection connection = factory.createConnection();
		// 3.建立會話 參數1 設定是否需要以事務方式送出 參數2 消息方式 采用自動簽收
		connection.start();// 啟動連接配接
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 4.建立目标(隊列)
		Queue queue = session.createQueue(queueName);
		// 5.建立消費者
		MessageConsumer consumer = session.createConsumer(queue);
		// 6.啟動監聽 監聽消息
		consumer.setMessageListener(new MessageListener() {

			public void onMessage(Message message) {
				try {
					TextMessage textMessage = (TextMessage) message;
					System.out.println("消費者消息生産者内容:" + textMessage.getText());
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		});
		// 不要關閉連接配接
	}
}
           
ActiveMQ概述ActiveMQ安裝JMS可靠消息機制實作點對點通訊實作釋出訂閱

注意:點對點方式中,消費者如果叢集,預設采用均攤方式消費,但是還是確定一條消息隻有一個消費者消費。

實作釋出訂閱

要先啟動消費者,訂閱主題,再啟動生産者,才能接收到消息。

這裡我們采用SpringBoot整合ActiveMQ。

生産者

1、pom檔案引入

<!-- 管理依賴 -->
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Finchley.M7</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<!-- SpringBoot整合Web元件 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<!-- SpringBoot Activemq -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>

	</dependencies>
           

2、配置檔案

spring:
  activemq:
  ###MQ連接配接通訊位址
    broker-url: tcp://127.0.0.1:61616
  ###賬号
    user: admin
  ###密碼  
    password: admin
###自定義主題
topic: my-topic
server:
  port: 8082
           

 3、定義主題

@Component
public class TopicConfig {
	@Value("${topic}")
	private String topicName;
	@Bean
	public Topic topic() {
		return new ActiveMQTopic(topicName);
	}
}
           

4、定義生産者

@Component
public class TopicProducer {
	@Autowired
	private JmsMessagingTemplate jmsMessagingTemplate;
	@Autowired
	private Topic topic;

	@Scheduled(fixedDelay = 5000)
	public void send() {
		String msg = System.currentTimeMillis() + "";
		System.out.println("采用釋出訂閱方式,生産者向消費者發送内容:" + msg);
		jmsMessagingTemplate.convertAndSend(topic, msg);
	}
}
           

 5、啟動項

@SpringBootApplication
@EnableScheduling
public class AppProducer {
	public static void main(String[] args) {
		SpringApplication.run(AppProducer.class, args);
	}
}
           

消費者

1、pom檔案引入 

<!-- 管理依賴 -->
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Finchley.M7</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<!-- SpringBoot整合Web元件 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<!-- SpringBoot Activemq -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>

	</dependencies>
           

2、配置檔案

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
#### 開啟釋出訂閱   
  jms: 
    pub-sub-domain: true
topic: spring-topic
server:
  port: 8082
           

 3、定義消費者

@Component
public class TopicConsumer {
	@JmsListener(destination = "spring-topic")
	public void receive(String msg) {
		System.out.println("釋出與訂閱消費者接受,生産者内容:" + msg);
	}
}
           

啟動項

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}