天天看点

rabbitMQ简单的生产、消费者模式-----direct(路由完全匹配)

生产者代码

package cn.enjoyedu.myexchange.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DirectProducer {
    //交换机的名字
    public final static String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接,连接到rabbitMQ
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置工厂的地址
        connectionFactory.setHost("localhost");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //设置信道的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明路由,消息体
        String[] routeKeys = {"meimei","nannan","someBody"};
        for (int i = 0; i < routeKeys.length; i++) {
            String routekey = routeKeys[i%3];
            String mes = "hello rabbitmq "+ routekey+(i+1);
            //发布消息
            channel.basicPublish(EXCHANGE_NAME,routekey,null,mes.getBytes());
        }
        //关闭资源
        channel.close();
        connection.close();

    }
}
           

消费者代码

package cn.enjoyedu.myexchange.direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class NormalConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接的工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接mq的地址
        connectionFactory.setHost("localhost");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明队列
        String queueName = "queue-meimei";
        channel.queueDeclare(queueName,false,false,false,null);
        System.out.println("waiting for message ......");

        //声明消费者
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String routingKey = envelope.getRoutingKey();
                String mes = new String(body,"utf-8");
                System.out.println("routingKey["+routingKey+"] "+mes);

            }
        };
        //消息者正是开始在指定队列上消费。(queue-king)
        //TODO 这里第二个参数是自动确认参数,如果是true则是自动确认
        channel.basicConsume(queueName,true,consumer);


    }
}