Rabbitmq入门
-
- 一、RabbitMQ简介
- 二、RabbitMQ安装运行
- 三、RabbitMQ基本配置
-
- 1、默认配置
- 2、RabbitMQ端口
- 四、RabbitMQ管理界面
-
- 1、 开启插件
- 2、 添加用户
- 3、 为用户分配操作权限
- 4、 为用户分配资源权限
- 五、rabbitmq角色
-
- 1、none
- 2、management
- 3、policymaker
- 4、monitoring
- 5、administrator
- 六、一个简单的Demo
-
- 1、引入依赖
- 2、代码
- 七、AMQP协议
-
- 1、简介
- 2、AMQP结构
- 3、AMQP生产者流转过程
- 4、AMQP消费者流转过程
- 八、rabbitmq核心概念
-
- 1、Producer
-
- (1)消息体(payload)
- (2)附加信息
- 2、Broker
- 3、Virtual Host
- 4、Channel
- 5、RoutingKey
- 6、Exchange
-
- (1)默认的exchange:
- (2)Fanout:扇型交换机
- (3)Direct:直连交换机
- (4)Topic:主题交换机
- (5)headers:头交换机
- 7、Queue
- 8、Binding
- 9、Consumer
- 九、运转流程
-
- 1、整个消息的运转流程
- 2、RabbitMQ的运转流程
一、RabbitMQ简介
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
二、RabbitMQ安装运行
环境准备:CentOS7、Erlang
需要安装erlang、socat、rabbitmq
三、RabbitMQ基本配置
1、默认配置
RabbitMQ有一套默认的配置,能够满足日常开发需求,如果需要修改,需要自己创建一个配置文件
touch/etc/rabbitmq/rabbitmq.conf
配置文件示例:
https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.conf.example
配置项说明:
https://www.rabbitmq.com/configure.html#config-items
2、RabbitMQ端口
rabbitmq会绑定一些端口,安装完后,需要将这些端口添加至防火墙。
4369
是Erlang的端口/节点名称映射程序,用来跟踪节点名称监听地址,在集群中起到一个类似DNS的作
5672,5671:
AMQP 0-9-1和1.0客户端端口,没有使用SSL和使用SSL的端口
25672(做管理用):
用于RabbitMQ节点间和CLI工具通信,配合4369使用。
15672:
HTTP_API端口,管理员用户才能访问,用于管理rabbitmq,需要启用management插件。
61613,61614:
当STOMP插件启用的时候打开,作为STOMP客户端端口(根据是否使用TLS选择)。
1883,8883:
当MQTT插件启用的时候打开,作为MQTT客户端端口(根据是否使用TLS选择)。
15674:
基于WebSocket的STOMP客户端端口(当插件Web STOMP启用的时候打开)。
15675:
基于Websocket的MQTT客户端端口(当插件Web MQTT启用的时候打开)。
四、RabbitMQ管理界面
1、 开启插件
rabbitmq-plugins enable rabbitmq_management
说明:rabbitmq有一个默认的guest用户,但只能通过localhost访问,所以需要添加一个能够远程访问的用户。
2、 添加用户
rabbitmqctl add_user admin admin
3、 为用户分配操作权限
rabbitmqctl set_user_tags admin administrator
4、 为用户分配资源权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
五、rabbitmq角色
1、none
不能访问management plugin
2、management
用户可以通过AMQP做任何事,还可以做如下的事:
1、列出自己可以通过AMQP登入的virtual hosts
2、查看自己的virtual hosts中的queues,exchanges和bindings
3、查看和关闭自己的channels和connections
4、查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
3、policymaker
可以做management做的任何事,还可以做如下的事:
1.查看、创建和删除自己的virtual hosts所属的policies和parameters
4、monitoring
可以做management做的任何事,还可以做如下的事:
1、列出所有virtual hosts,包括他们不能登录的virtual hosts
2、查看其他用户的channels和connections
3、查看节点级别的数据如clustering和memory使用情况
4、查看真正的关于所有virtual hosts的全局的统计信息
5、administrator
可以做policymaker和monitoring做的任何事,还可以做如下的事:
1、创建和删除virtual hosts
2、查看、创建和删除users
3、查看创建和删除permissions
4、关闭其他用户的connections
六、一个简单的Demo
1、引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
2、代码
建立生产者类 Producer.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 简单队列生产者
* 使用RabbitMQ的默认交换器发送消息
*/
public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
//根据实际地址填写
factory.setHost("127.0.0.1");
//可以不设置,默认也是5672
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("生产者");
// 4、从链接中创建通道
channel = connection.createChannel();
/**
* 5、声明(创建)队列
* 如果队列不存在,才会创建
* RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
*
* queueDeclare参数说明:
* @param queue 队列名称
* @param durable 队列是否持久化
* @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制
* @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
* @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
*/
channel.queueDeclare("queue1", false, false, false, null);
// 消息内容
String message = "Hello World!";
// 6、发送消息
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息已发送!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
建立消费者类 Consumer.java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 简单队列消费者
*/
public class Consumer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("127.0.0.1");
//端口可以不写,默认是5672
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("消费者");
// 4、从链接中创建通道
channel = connection.createChannel();
/**
* 5、声明(创建)队列
* 如果队列不存在,才会创建
* RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
*
* queueDeclare参数说明:
* @param queue 队列名称
* @param durable 队列是否持久化
* @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,
* 并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。
* 一般在队列和交换器绑定时使用
* @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
* @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
*/
channel.queueDeclare("queue1", false, false, false, null);
// 6、定义收到消息后的回调
DeliverCallback callback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 7、监听队列
channel.basicConsume("queue1", true, callback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println("开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 8、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 9、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
此示例的生产者和消费者都只是通过队列来发送和接收消息,并未用到Exchange,
原理:手动通过channel.queueDeclare创建队列时,后台会自动将这个队列绑定到一个名称为空的 Direct Exchange 上,绑定 RoutingKey 与队列名称相同。
所以在调用channel.basicPublish方法时,exchange传的是空字符串,而routingKey传的是队列名。
有了这个默认的交换机和绑定,使我们只关心队列这一层即可,这个比较适合做一些简单的应用。
在RabbitMQ的管理界面的Exchange中可以看到这个默认交换器:(AMQP default)
七、AMQP协议
1、简介
AMQP(Advanced Message Queuing Protocol),是一个提供统一消息服务的标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件而设计。
2、AMQP结构
3、AMQP生产者流转过程
4、AMQP消费者流转过程
八、rabbitmq核心概念
1、Producer
生产者,就是投递消息的一方,生产者创建消息,然后发布到rabbitmq中。
消息一般可以包含两个部分:消息体和附件信息。
(1)消息体(payload)
在实际应用中,消息体一般是一个带有业务逻辑结构的数据
eg:一个JSON字符串,当然可以进一步对这个消息体进行序列化操作
(2)附加信息
用来表述这条消息
eg:目标交换器的名称、路由键、一些自定义属性
2、Broker
消息中间件的服务节点
对于rabbitmq来说,一个rabbitmq Broker可以简单地看作一个rabbitmq服务节点,或者rabbitmq服务实例。
也可以将一个rabbitmq Broker看作一台rabbitmq服务器。
3、Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象
虚拟主机是共享相同的身份认证和加密环境的独立服务器域
每个vhost本质上就是一个mini版的rabbitmq服务器,拥有自己的队列、交换器、绑定和权限机制
vhost是AMQP概念的基础,必须在连接时指定,rabbitmq默认的vhost是/
4、Channel
频道或信道,是建立在connection连接之上的一种轻量级的连接
大部分的操作是在Channel这个接口中完成的,包括定义队列的声明queueDeclare、交换机的声明exchangeDeclare、队列的绑定queueBind、发布消息basicPublish、消费消息basicConsume等
如果把Connection比作一条光纤电缆的话,那么Channel信道就比作成光纤电缆中的其中一束光纤
一个Connection上可以创建任意数量的Channel
5、RoutingKey
路由键:生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。
RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用
在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里。
6、Exchange
交换器,生产者将消息发送到Exchange(交换器,通常也可以用大写的“X”来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或返回给生产者,或直接丢弃。
(1)默认的exchange:
根据消息指定的queue发送到指定的队列
(2)Fanout:扇型交换机
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中(广播)
(3)Direct:直连交换机
它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中
(4)Topic:主题交换机
与direct类似,但它可以通过通配符进行模糊匹配
【.】表示单词的拆分方式
【*】表示一个单词的模糊匹配【只能是一个单词,多个单词就匹配不上】
如:路由键是*.apple.big则表示第一个单词可以是任意的,只要后边单词完全匹配,就可以
【#】 表示完全模糊匹配。
如:路由键是#.little,那么发送消息的路由键可以是green,apple,little,也就是说前面的单词是任意的
(5)headers:头交换机
不依赖于路由键的匹配规则,在绑定queue与exchange时指定一组键值对,只要消息头中含有该“键值对”【键、值都要匹配上】,就路由到对应的队列
**`direct、topic相对来说,要灵活一些,headers性能很差,也不实用 `**
7、Queue
队列,是rabbitmq的内部对象,用于存储消息
8、Binding
绑定,rabbitmq中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),
这样rabbitmq就知道如何正确地将消息路由到队列了
9、Consumer
消费者,就是接收消息的一方
消费者连接到rabbitmq服务器,并订阅到队列上
当消费者消费一条消息时,只是消费消息的消息体。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,
消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道
九、运转流程
1、整个消息的运转流程
2、RabbitMQ的运转流程
生产者发送消息的过程:
1.生产者连接到RabbitMQ Broker,简历一个连接(Connection),开启一个信道(Channel)
2.生产者声明一个交换器,并设置相关属性,比如交换机类型,是否持久化等
3.生产者声明一个队列并设置相关属性,比如是否排他,是否持久化,是否自动删除等