天天看點

MQ Demo 整合 Spring

本地安裝好active mq ,開啟服務。

附件是 mq消息隊列 接收請求,并将請求處理完成放到隊列的示例。

單獨使用MQ,示例如下:

package demo.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
	private static final int SEND_NUMBER = 5;

	public static void main(String[] args) {
		// ConnectionFactory :連接配接工廠,JMS 用它建立連接配接
		ConnectionFactory connectionFactory;
		// Connection :JMS 用戶端到JMS Provider 的連接配接
		Connection connection = null;
		// Session: 一個發送或接收消息的線程
		Session session;
		// Destination :消息的目的地;消息發送給誰.
		Destination destination;
		// MessageProducer:消息發送者
		MessageProducer producer;
		// TextMessage message;
		// 構造ConnectionFactory執行個體對象,此處采用ActiveMq的實作jar
		connectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		try {
			// 構造從工廠得到連接配接對象
			connection = connectionFactory.createConnection();
			// 啟動
			connection.start();
			// 擷取操作連接配接
			session = connection.createSession(Boolean.TRUE,
					Session.AUTO_ACKNOWLEDGE);
			// 擷取session注意參數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置
			destination = session.createQueue("FirstQueue");
			// 得到消息生成者【發送者】
			producer = session.createProducer(destination);
			// 設定不持久化,此處學習,實際根據項目決定
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			// 構造消息,此處寫死,項目就是參數,或者方法擷取
			sendMessage(session, producer);
			session.commit();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if (null != connection)
					connection.close();
			} catch (Throwable ignore) {
			}
		}
	}

	public static void sendMessage(Session session, MessageProducer producer)
			throws Exception {
		for (int i = 1; i <= SEND_NUMBER; i++) {
			TextMessage message = session.createTextMessage("ActiveMq 發送的消息"
					+ i);
			// 發送消息到目的地方
			System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);
			producer.send(message);
		}
	}
}
           
package demo.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
	public static void main(String[] args) {
		// ConnectionFactory :連接配接工廠,JMS 用它建立連接配接
		ConnectionFactory connectionFactory;
		// Connection :JMS 用戶端到JMS Provider 的連接配接
		Connection connection = null;
		// Session: 一個發送或接收消息的線程
		Session session;
		// Destination :消息的目的地;消息發送給誰.
		Destination destination;
		// 消費者,消息接收者
		MessageConsumer consumer;
		connectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		try {
			// 構造從工廠得到連接配接對象
			connection = connectionFactory.createConnection();
			// 啟動
			connection.start();
			// 擷取操作連接配接
			session = connection.createSession(Boolean.FALSE,
					Session.AUTO_ACKNOWLEDGE);
			// 擷取session注意參數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置
			destination = session.createQueue("FirstQueue");
			consumer = session.createConsumer(destination);
			while (true) {
				// 設定接收者接收消息的時間,為了便于測試,這裡誰定為100s
				TextMessage message = (TextMessage) consumer.receive(1000);
				if (null != message) {
					System.out.println("收到消息" + message.getText());
				} else {
					break;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if (null != connection)
					connection.close();
			} catch (Throwable ignore) {
			}
		}
	}
}