概述
是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。RabbitMQ、ZeroMQ、ActiveMQ均支持常用的多种语言客户端 C++、Java、.Net,、Python、 Php、 Ruby等。
ActiveMQ安装
1、下载ActiveMQ
去官方网站下载:http://activemq.apache.org/activemq-5152-release.html。
2、运行ActiveMQ
下载好后,解压压缩包到一个目录。进入bin目录,发现有win32和win64两个文件夹,这2个文件夹分别对应windows32位和windows64位操作系统的启动脚本。根据自己的系统,选择合适的路径,其中activemq.bat便是启动脚本,双击启动。
3、登录ActiveMQ
ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入:http://localhost:8161/admin要求输入用户名密码,默认用户名密码为admin、admin,这个用户名密码是在conf/users.properties中配置的。输入用户名密码后便可看到如下图的ActiveMQ控制台界面了。
JMS可靠消息机制
消息确认机制
JMS消息只有在被确认之后,才认为已经被成功的消费了,消息的成功消费通常包含三个阶段:客户接收消息,客户处理消息和消息被确认。
在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式。改参数有三个可选值:
① Session.AUTO_ACKNOWLEDGE:当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
②Session.CLIENT_ACKNOWLEDGE:客户通过调用消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行,确认一个被消费的消息,将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
③Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝的确认消息的提交。如果JMS Provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true
持久话机制
PERSISTENT:指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失
NON_PERSISTENT:不要求JMS provider持久保存消息
// 设置消息持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
实现点对点通讯
使用ActiveMQ完成点对点(p2p)通讯模式
生产者
public class Produce{
// mq通讯地址
private static String url = "tcp://127.0.0.1:61616";
// 队列名称
private static String queueName = "my_queue";
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂 吗,密码采用默认密码admin 和admin
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
// 2.创建连接
Connection connection = factory.createConnection();
// 3.创建会话 参数1 设置是否需要以事务方式提交 参数2 消息方式 采用自动签收
connection.start();// 启动连接
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目标(队列)
Queue queue = session.createQueue(queueName);
// 5.创建生产者
MessageProducer producer = session.createProducer(queue);
for (int i = 1; i <= 10; i++) {
// 6.创建 消息
TextMessage textMessage = session.createTextMessage("消息内容i:" + i);
// 7.发送消息
producer.send(textMessage);
}
// 8.关闭连接
connection.close();
System.out.println("消息发送完毕!");
}
}
查看ActiveMQ
消费者
public class Consumer {
// mq通讯地址
private static String url = "tcp://127.0.0.1:61616";
// 队列名称
private static String queueName = "my_queue";
public static void main(String[] args) throws JMSException {
System.out.println("我是消费者002");
// 1.创建连接工厂 吗,密码采用默认密码admin 和admin
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
// 2.创建连接
Connection connection = factory.createConnection();
// 3.创建会话 参数1 设置是否需要以事务方式提交 参数2 消息方式 采用自动签收
connection.start();// 启动连接
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目标(队列)
Queue queue = session.createQueue(queueName);
// 5.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
// 6.启动监听 监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("消费者消息生产者内容:" + textMessage.getText());
} catch (Exception e) {
// TODO: handle exception
}
}
});
// 不要关闭连接
}
}
注意:点对点方式中,消费者如果集群,默认采用均摊方式消费,但是还是确保一条消息只有一个消费者消费。
实现发布订阅
要先启动消费者,订阅主题,再启动生产者,才能接收到消息。
这里我们采用SpringBoot整合ActiveMQ。
生产者
1、pom文件引入
<!-- 管理依赖 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.M7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- SpringBoot整合Web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- SpringBoot Activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
2、配置文件
spring:
activemq:
###MQ连接通讯地址
broker-url: tcp://127.0.0.1:61616
###账号
user: admin
###密码
password: admin
###自定义主题
topic: my-topic
server:
port: 8082
3、定义主题
@Component
public class TopicConfig {
@Value("${topic}")
private String topicName;
@Bean
public Topic topic() {
return new ActiveMQTopic(topicName);
}
}
4、定义生产者
@Component
public class TopicProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Scheduled(fixedDelay = 5000)
public void send() {
String msg = System.currentTimeMillis() + "";
System.out.println("采用发布订阅方式,生产者向消费者发送内容:" + msg);
jmsMessagingTemplate.convertAndSend(topic, msg);
}
}
5、启动项
@SpringBootApplication
@EnableScheduling
public class AppProducer {
public static void main(String[] args) {
SpringApplication.run(AppProducer.class, args);
}
}
消费者
1、pom文件引入
<!-- 管理依赖 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.M7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- SpringBoot整合Web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- SpringBoot Activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
2、配置文件
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
#### 开启发布订阅
jms:
pub-sub-domain: true
topic: spring-topic
server:
port: 8082
3、定义消费者
@Component
public class TopicConsumer {
@JmsListener(destination = "spring-topic")
public void receive(String msg) {
System.out.println("发布与订阅消费者接受,生产者内容:" + msg);
}
}
启动项
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}