生产者代码
package cn.enjoyedu.myexchange.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectProducer {
//交换机的名字
public final static String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接,连接到rabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置工厂的地址
connectionFactory.setHost("localhost");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//设置信道的交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明路由,消息体
String[] routeKeys = {"meimei","nannan","someBody"};
for (int i = 0; i < routeKeys.length; i++) {
String routekey = routeKeys[i%3];
String mes = "hello rabbitmq "+ routekey+(i+1);
//发布消息
channel.basicPublish(EXCHANGE_NAME,routekey,null,mes.getBytes());
}
//关闭资源
channel.close();
connection.close();
}
}
消费者代码
package cn.enjoyedu.myexchange.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class NormalConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接的工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的地址
connectionFactory.setHost("localhost");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
String queueName = "queue-meimei";
channel.queueDeclare(queueName,false,false,false,null);
System.out.println("waiting for message ......");
//声明消费者
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String mes = new String(body,"utf-8");
System.out.println("routingKey["+routingKey+"] "+mes);
}
};
//消息者正是开始在指定队列上消费。(queue-king)
//TODO 这里第二个参数是自动确认参数,如果是true则是自动确认
channel.basicConsume(queueName,true,consumer);
}
}