天天看点

ActiveMQ概述ActiveMQ安装JMS可靠消息机制实现点对点通讯实现发布订阅

概述

        是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。RabbitMQ、ZeroMQ、ActiveMQ均支持常用的多种语言客户端 C++、Java、.Net,、Python、 Php、 Ruby等。

ActiveMQ安装

1、下载ActiveMQ

      去官方网站下载:http://activemq.apache.org/activemq-5152-release.html。

2、运行ActiveMQ

     下载好后,解压压缩包到一个目录。进入bin目录,发现有win32和win64两个文件夹,这2个文件夹分别对应windows32位和windows64位操作系统的启动脚本。根据自己的系统,选择合适的路径,其中activemq.bat便是启动脚本,双击启动。

3、登录ActiveMQ    

       ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入:http://localhost:8161/admin要求输入用户名密码,默认用户名密码为admin、admin,这个用户名密码是在conf/users.properties中配置的。输入用户名密码后便可看到如下图的ActiveMQ控制台界面了。

ActiveMQ概述ActiveMQ安装JMS可靠消息机制实现点对点通讯实现发布订阅

JMS可靠消息机制

消息确认机制

       JMS消息只有在被确认之后,才认为已经被成功的消费了,消息的成功消费通常包含三个阶段:客户接收消息,客户处理消息和消息被确认。

       在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式。改参数有三个可选值:

   ① Session.AUTO_ACKNOWLEDGE:当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。

   ②Session.CLIENT_ACKNOWLEDGE:客户通过调用消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行,确认一个被消费的消息,将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。

   ③Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝的确认消息的提交。如果JMS Provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true

持久话机制

PERSISTENT:指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失

NON_PERSISTENT:不要求JMS provider持久保存消息

// 设置消息持久化

producer.setDeliveryMode(DeliveryMode.PERSISTENT);
           

实现点对点通讯

使用ActiveMQ完成点对点(p2p)通讯模式

生产者

public class Produce{
	// mq通讯地址
	private static String url = "tcp://127.0.0.1:61616";
	// 队列名称
	private static String queueName = "my_queue";

	public static void main(String[] args) throws JMSException {
		// 1.创建连接工厂 吗,密码采用默认密码admin 和admin
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
		// 2.创建连接
		Connection connection = factory.createConnection();
		// 3.创建会话 参数1 设置是否需要以事务方式提交 参数2 消息方式 采用自动签收
		connection.start();// 启动连接
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 4.创建目标(队列)
		Queue queue = session.createQueue(queueName);
		// 5.创建生产者
		MessageProducer producer = session.createProducer(queue);
		for (int i = 1; i <= 10; i++) {
			// 6.创建 消息
			TextMessage textMessage = session.createTextMessage("消息内容i:" + i);
			// 7.发送消息
			producer.send(textMessage);

		}
		// 8.关闭连接
		connection.close();
		System.out.println("消息发送完毕!");
	}
}
           

查看ActiveMQ

ActiveMQ概述ActiveMQ安装JMS可靠消息机制实现点对点通讯实现发布订阅

消费者

public class Consumer {
	// mq通讯地址
	private static String url = "tcp://127.0.0.1:61616";
	// 队列名称
	private static String queueName = "my_queue";

	public static void main(String[] args) throws JMSException {
		System.out.println("我是消费者002");
		// 1.创建连接工厂 吗,密码采用默认密码admin 和admin
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
		// 2.创建连接
		Connection connection = factory.createConnection();
		// 3.创建会话 参数1 设置是否需要以事务方式提交 参数2 消息方式 采用自动签收
		connection.start();// 启动连接
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 4.创建目标(队列)
		Queue queue = session.createQueue(queueName);
		// 5.创建消费者
		MessageConsumer consumer = session.createConsumer(queue);
		// 6.启动监听 监听消息
		consumer.setMessageListener(new MessageListener() {

			public void onMessage(Message message) {
				try {
					TextMessage textMessage = (TextMessage) message;
					System.out.println("消费者消息生产者内容:" + textMessage.getText());
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		});
		// 不要关闭连接
	}
}
           
ActiveMQ概述ActiveMQ安装JMS可靠消息机制实现点对点通讯实现发布订阅

注意:点对点方式中,消费者如果集群,默认采用均摊方式消费,但是还是确保一条消息只有一个消费者消费。

实现发布订阅

要先启动消费者,订阅主题,再启动生产者,才能接收到消息。

这里我们采用SpringBoot整合ActiveMQ。

生产者

1、pom文件引入

<!-- 管理依赖 -->
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Finchley.M7</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<!-- SpringBoot整合Web组件 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<!-- SpringBoot Activemq -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>

	</dependencies>
           

2、配置文件

spring:
  activemq:
  ###MQ连接通讯地址
    broker-url: tcp://127.0.0.1:61616
  ###账号
    user: admin
  ###密码  
    password: admin
###自定义主题
topic: my-topic
server:
  port: 8082
           

 3、定义主题

@Component
public class TopicConfig {
	@Value("${topic}")
	private String topicName;
	@Bean
	public Topic topic() {
		return new ActiveMQTopic(topicName);
	}
}
           

4、定义生产者

@Component
public class TopicProducer {
	@Autowired
	private JmsMessagingTemplate jmsMessagingTemplate;
	@Autowired
	private Topic topic;

	@Scheduled(fixedDelay = 5000)
	public void send() {
		String msg = System.currentTimeMillis() + "";
		System.out.println("采用发布订阅方式,生产者向消费者发送内容:" + msg);
		jmsMessagingTemplate.convertAndSend(topic, msg);
	}
}
           

 5、启动项

@SpringBootApplication
@EnableScheduling
public class AppProducer {
	public static void main(String[] args) {
		SpringApplication.run(AppProducer.class, args);
	}
}
           

消费者

1、pom文件引入 

<!-- 管理依赖 -->
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Finchley.M7</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<!-- SpringBoot整合Web组件 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<!-- SpringBoot Activemq -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>

	</dependencies>
           

2、配置文件

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
#### 开启发布订阅   
  jms: 
    pub-sub-domain: true
topic: spring-topic
server:
  port: 8082
           

 3、定义消费者

@Component
public class TopicConsumer {
	@JmsListener(destination = "spring-topic")
	public void receive(String msg) {
		System.out.println("发布与订阅消费者接受,生产者内容:" + msg);
	}
}
           

启动项

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}