本文主要以官网(https://www.rabbitmq.com/getstarted.html)的例子为参考,介绍使用Java客户端来操作RabbitMQ,文中使用到的软件版本:RabbitMQ 3.8.9、Java 1.8.0_191。
1、准备
1.1、引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
1.2、编写工具类
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtil {
private static ConnectionFactory factory;
public static Connection getConnection() throws Exception {
if (factory == null) {
factory = new ConnectionFactory();
factory.setHost("10.49.196.10");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
}
return factory.newConnection();
}
public static void close(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void close(Channel channel) {
if (channel != null) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
2、场景例子
2.1、Hello World
2.1.1、场景描述
最简单的场景,一个生产者,一个消费者。
2.1.2、代码样例
生产者:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* HelloWorld,一个生产者,一个消费者
*/
public class HelloWorldProducer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
/**
* 声明队列,如果队列不存在则创建;如果已存在则设置的参数值需跟原队列一致,否则会保持
* 默认绑定到默认队列,routingKey就是队列名称
*
* 是否持久化: 如果为false,则重启rabbit后,队列会消失
* 是否排他: 即只允许该channel访问该队列,一般等于true的话用于一个队列只能有一个消费者来消费的场景
* 是否自动删除: 消费完消息删除该队列
* 其他属性:x-queue-type(quorum、classic),默认为classic
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World,曹操!";
//消息的routingKey就是队列名称
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
RabbitMQUtil.close(channel);
RabbitMQUtil.close(connection);
}
}
}
消费者:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
/**
* java客户端操作rabbitmq;HelloWorld,一个生产者,一个消费者
*/
public class HelloWorldConsumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
2.2、Work Queues
2.2.1、场景描述
Work Queues(工作队列模式),一个生产者,多个消费者,一条消息只能被一个消费者消费。
2.2.2、代码样例
生产者:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
/**
* Work Queues(工作队列模式),一个生产者,多个消费者,一条消息只能被一个消费者消费
*/
public class WorkQueuesProducer {
private final static String QUEUE_NAME = "work_queues";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
//队列持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 20; i++) {
String message = "消息-" + i;
//发送的消息持久化,重启rabbitmq消息也不会丢失
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
RabbitMQUtil.close(channel);
RabbitMQUtil.close(connection);
}
}
}
消费者:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
/**
* Work Queues(工作队列模式),一个生产者,多个消费者,一条消息只能被一个消费者消费
*/
public class WorkQueuesConsumer {
private final static String QUEUE_NAME = "work_queues";
public static void main(String[] args) throws Exception {
//模拟三个消费者,rabbitmq默认会把消息轮询推给每个消费者
for (int i = 0; i < 3; i++) {
Connection connection = RabbitMQUtil.getConnection();
//new Thread(new Worker(connection, i)).start();
new Thread(new Worker2(connection, i)).start();
//new Thread(new Worker3(connection, i)).start();
}
}
/**
* 自动确认Worker
*/
static class Worker implements Runnable {
private Connection connection;
private int index;
public Worker(Connection connection, int index) {
this.connection = connection;
this.index = index;
}
@Override
public void run() {
try {
System.out.println("消费者-" + index + " 开始接受消息。");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者-" + index + " Received '" + message + "'");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//自动确认,如果业务处理失败或该消费者宕机,发送到该消费者的消息都会被删除
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 手动确认Worker
*/
static class Worker2 implements Runnable {
private Connection connection;
private int index;
public Worker2(Connection connection, int index) {
this.connection = connection;
this.index = index;
}
@Override
public void run() {
try {
System.out.println("消费者-" + index + " 开始接受消息。");
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicQos(1);//一次只接受一条消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者-" + index + " Received '" + message + "'");
//业务处理...
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
//在业务处理完成后手动确认;避免一个消费者宕机等导致消息丢失
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//autoAck设为false
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 拉模式消费
*/
static class Worker3 implements Runnable {
private Connection connection;
private int index;
public Worker3(Connection connection, int index) {
this.connection = connection;
this.index = index;
}
@Override
public void run() {
try {
System.out.println("消费者-" + index + " 开始接受消息。");
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
while (true) {
GetResponse response = channel.basicGet(QUEUE_NAME, false);
if (response == null) {
continue;
}
String message = new String(response.getBody());
System.out.println("消费者-" + index + " Received '" + message + "'");
//业务处理...
Thread.sleep(1000);
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
2.3、Publish/Subscribe
2.3.1、场景描述
publish/Subscribe(发布/订阅模式),一条消息被发送到多个队列。
2.3.2、代码样例
生产者:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* Publish/Subscribe(发布/订阅模式),一条消息被发送到多个队列
*/
public class PublishSubscribeProducer {
private static final String EXCHANGE_NAME = "logs";
private final static String QUEUE_NAME_1 = "destination_terminal";
private final static String QUEUE_NAME_2 = "destination_file";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
//声明exchange,类型为fanout,消息路由到所有绑定的队列
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明队列
channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
//绑定
channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "");
for (int i = 0; i < 20; i++) {
String message = "消息-" + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
RabbitMQUtil.close(channel);
RabbitMQUtil.close(connection);
}
}
}
消费者:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
/**
* Publish/Subscribe(发布/订阅模式),一条消息被发送到多个队列
*/
public class PublishSubscribeConsumer {
private final static String QUEUE_NAME_1 = "destination_terminal";
private final static String QUEUE_NAME_2 = "destination_file";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
System.out.println("开始接受消息...");
new Thread(() -> {
try {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("destination_terminal Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME_1, true, deliverCallback, consumerTag -> { });
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("destination_file Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME_2, true, deliverCallback, consumerTag -> { });
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
2.4、Routing
2.4.1、场景描述
Routing(路由模式),队列通过bingingKey绑定到Exchange,发送消息时指定routingKey,Exchange根据routingKey精确匹配bindingKey来路由消息.
2.4.2、代码样例
生产者:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* Routing(路由模式),队列通过bingingKey绑定到Exchange,发送消息时指定routingKey,Exchange根据routingKey精确匹配bindingKey来路由消息
*/
public class RoutingProducer {
private static final String EXCHANGE_NAME = "direct_logs";
private final static String QUEUE_NAME_1 = "direct_destination_terminal";
private final static String QUEUE_NAME_2 = "direct_destination_file";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
//声明exchange,类型为direct,根据routingKey精确匹配bindingKey来路由消息
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//声明队列
channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
//队列1接受info、warn、error日志
channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "warn");
channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "error");
//队列2只接受error日志
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "error");
for (int i = 0; i < 20; i++) {
String message = "消息-" + i;
String routingKey = "";
if (i % 3 == 0) {
routingKey = "info";
} else if (i % 3 == 1) {
routingKey = "warn";
} else if (i % 3 == 2) {
routingKey = "error";
}
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + ":" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
RabbitMQUtil.close(channel);
RabbitMQUtil.close(connection);
}
}
}
消费者:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
/**
* Routing(路由模式),队列通过bingingKey绑定到Exchange,发送消息时指定routingKey,Exchange根据routingKey精确匹配bindingKey来路由消息
*/
public class RoutingConsumer {
private final static String QUEUE_NAME_1 = "direct_destination_terminal";
private final static String QUEUE_NAME_2 = "direct_destination_file";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
System.out.println("开始接受消息...");
new Thread(() -> {
try {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("direct_destination_terminal Received '" + delivery.getEnvelope().getRoutingKey() + ":" + message + "'");
};
channel.basicConsume(QUEUE_NAME_1, true, deliverCallback, consumerTag -> { });
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("direct_destination_file Received '" + delivery.getEnvelope().getRoutingKey() + ":" + message + "'");
};
channel.basicConsume(QUEUE_NAME_2, true, deliverCallback, consumerTag -> { });
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
2.5、Topics
2.5.1、场景描述
Topics(主题模式),队列通过bingingKey绑定到Exchange,发送消息时指定routingKey,Exchange根据根据routingKey匹配bindingKey模式来路由消息. bingingKey模式用"."分隔,"*"代表一个单词,"#"代表0或多个单词.
2.5.2、代码样例
生产者:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* Topics(主题模式),队列通过bingingKey绑定到Exchange,发送消息时指定routingKey,Exchange根据根据routingKey匹配bindingKey模式来路由消息
* bingingKey模式用"."分隔,"*"代表一个单词,"#"代表0或多个单词
*/
public class TopicsProducer {
private static final String EXCHANGE_NAME = "topics_logs";
private final static String QUEUE_NAME_1 = "topics_destination_terminal";
private final static String QUEUE_NAME_2 = "topics_destination_file";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
//声明exchange,类型为topic,根据routingKey匹配bindingKey模式来路由消息
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明队列
channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
//队列1接受模块A的所有日志
channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "moduleA.*");
//队列2接受所有error的日志
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "*.error");
for (int i = 0; i < 20; i++) {
String message = "消息-" + i;
String routingKey = "";
if (i % 2 == 0) {
routingKey = "moduleA";
} else {
routingKey = "moduleB";
}
routingKey += ".";
if (i % 3 == 0) {
routingKey += "info";
} else if (i % 3 == 1) {
routingKey += "warn";
} else if (i % 3 == 2) {
routingKey += "error";
}
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + ":" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
RabbitMQUtil.close(channel);
RabbitMQUtil.close(connection);
}
}
}
消费者:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
/**
* Topics(路由模式),队列通过bingingKey绑定到Exchange,发送消息时指定routingKey,Exchange根据routingKey匹配bindingKey模式来路由消息
* bingingKey模式用"."分隔,"*"代表一个单词,"#"代表0或多个单词
*/
public class TopicsConsumer {
private final static String QUEUE_NAME_1 = "topics_destination_terminal";
private final static String QUEUE_NAME_2 = "topics_destination_file";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
System.out.println("开始接受消息...");
new Thread(() -> {
try {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("topics_destination_terminal Received '" + delivery.getEnvelope().getRoutingKey() + ":" + message + "'");
};
channel.basicConsume(QUEUE_NAME_1, true, deliverCallback, consumerTag -> { });
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("topics_destination_file Received '" + delivery.getEnvelope().getRoutingKey() + ":" + message + "'");
};
channel.basicConsume(QUEUE_NAME_2, true, deliverCallback, consumerTag -> { });
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
2.6、Topics
2.6.1、场景描述
RPC(远程调用)
1.客户端把消息发送到rpc队列
2.服务端从rpc队列获取消息,并把获得到的消息作为参数来调用函数,然后把结果通过回调队列发送给客户端
3.客户端从回调队列获取返回结果
2.6.2、代码样例
服务端:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
/**
* RPC(远程调用)
* 1.客户端把消息发送到rpc队列
* 2.服务端从rpc队列获取消息,并把获得到的消息作为参数来调用函数,然后把结果通过回调队列发送给客户端
* 3.客户端从回调队列获取返回结果
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
//发送结果到回调队列
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};
//从rpc队列中获取客户端发过来的消息
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static int fib(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fib(n - 1) + fib(n - 2);
}
}
客户端:
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* RPC(远程调用)
* 1.客户端把消息发送到rpc队列
* 2.服务端从rpc队列获取消息,并把获得到的消息作为参数来调用函数,然后把结果通过回调队列发送给客户端
* 3.客户端从回调队列获取返回结果
*/
public class RPCClient {
private String requestQueueName = "rpc_queue";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
RPCClient client = new RPCClient();
for (int i = 0; i < 10; i++) {
System.out.println(" [x] Requesting fib(" + i + ")");
String response = client.call(channel, i + "");
System.out.println(" [.] Got '" + response + "'");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
RabbitMQUtil.close(channel);
RabbitMQUtil.close(connection);
}
}
private String call(Channel channel, String message) throws Exception {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
}
2.7、Publisher Confirms
2.7.1、场景描述
Publisher Confirms(消息发送确认),发送消息的时候对发送的消息进行确认,对发送成功的消息进行确认,对发送失败的消息可以进行进一步的处理(如重新发送).
2.7.2、代码样例
package com.abc.demo.general.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import java.time.Duration;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.BooleanSupplier;
/**
* Publisher Confirms(消息发送确认),发送消息的时候对发送的消息进行确认,对发送失败的消息可以进行进一步的处理
*/
public class PublisherConfirms {
private final static String QUEUE_NAME = "publisher_confirms";
private static final int MESSAGE_COUNT = 50_000;
public static void main(String[] args) {
publishMessagesIndividually();
publishMessagesInBatch();
handlePublishConfirmsAsynchronously();
}
/**
* 单条消息确认,速度较慢,吞吐量不高
* @throws Exception
*/
private static void publishMessagesIndividually() {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
//开启消息确认,默认时关闭的
channel.confirmSelect();
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
//超时或发送失败抛出异常
channel.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
} catch (Exception e) {
e.printStackTrace();
} finally {
RabbitMQUtil.close(channel);
RabbitMQUtil.close(connection);
}
}
/**
* 批量确认,速度较快,吞吐量较大;但如果确认失败不知道那条消息出问题了
* @throws Exception
*/
private static void publishMessagesInBatch() {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
channel.confirmSelect();
int batchSize = 100;
int outstandingMessageCount = 0;
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
} catch (Exception e) {
e.printStackTrace();
} finally {
RabbitMQUtil.close(channel);
RabbitMQUtil.close(connection);
}
}
/**
* 异步确认,最佳性能和资源使用,但编码有些复杂
*/
private static void handlePublishConfirmsAsynchronously() {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
channel.confirmSelect();
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber,true);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n", body, sequenceNumber, multiple);
/*
*消息发送失败时这边再次调用发送成功的处理方法,也可以把失败的消息(获取失败消息的方法同ackCallback里方法)重新发送,不过不能在这里发送消息(rabbitmq不支持),
*可以把失败的消息发送到ConcurrentLinkedQueue,发送消息的线程从该ConcurrentLinkedQueue取数据来发送消息
*/
ackCallback.handle(sequenceNumber, multiple);
};
channel.addConfirmListener(ackCallback, nackCallback);
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
}
if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
}
long end = System.nanoTime();
System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
} catch (Exception e) {
e.printStackTrace();
} finally {
RabbitMQUtil.close(channel);
RabbitMQUtil.close(connection);
}
}
private static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
int waited = 0;
while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
Thread.sleep(100L);
waited += 100;
}
return condition.getAsBoolean();
}
/**
* 异步确认简单测试
*/
private static void asynchronousTest() {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
//开启消息确认,默认时关闭的
channel.confirmSelect();
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
System.out.println("ackCallback,sequenceNumber=" + sequenceNumber + ",multiple=" + multiple);
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
System.out.println("nackCallback,sequenceNumber=" + sequenceNumber + ",multiple=" + multiple);
};
channel.addConfirmListener(ackCallback, nackCallback);
for (int i = 0; i < 100; i++) {
String body = String.valueOf(i);
channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
}
Thread.sleep(1000 * 30);
} catch (Exception e) {
e.printStackTrace();
} finally {
RabbitMQUtil.close(channel);
RabbitMQUtil.close(connection);
}
}
}