RabbitMQ的路由模式——直连

DBC 1.6K 0

消息生产者:

public class Send {
    private final static String EXCHANGE_NAME  = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("81.71.147.62");
        factory.setUsername("");
        factory.setPassword("");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        try (   //JDK7语法 或自动关闭 connnection和channel
                //创建连接
                Connection connection = factory.newConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
            //绑定交换机,直连交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            String error = "我是错误日志";
            String info = "我是info日志";
            String debug = "我是debug日志";
            channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));

            System.out.println("消息发送成功");


        }
    }
}

消息消费者(两个节点)

public class Recv1 {

    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("81.71.147.62");
        factory.setUsername("");
        factory.setPassword("");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机,fanout扇形,即广播类型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //获取队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
        channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");


        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };

        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

    }
}
public class Recv2 {

    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("81.71.147.62");
        factory.setUsername("");
        factory.setPassword("");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机,fanout扇形,即广播类型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //获取队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
        channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };

        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

    }
}

发表评论 取消回复
表情 图片 链接 代码

分享