RabbitMQ的topic主题通配符模式——通配符(最重要——最终模式!)

DBC 2.2K 0

消息生产者:

public class Send {
    private final static String EXCHANGE_NAME  = "exchange_topic";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("81.71.147.62");
        factory.setUsername("");
        factory.setPassword("");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        try (   //JDK7语法 或自动关闭 connnection和channel
                //创建连接
                Connection connection = factory.newConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
            //绑定交换机,topic交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            String error = "我是错误日志";
            String info = "我是info日志";
            String debug = "我是debug日志";
            channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));

            System.out.println("消息发送成功");


        }
    }
}

消息消费者:两个

public class Recv1 {

    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("81.71.147.62");
        factory.setUsername("");
        factory.setPassword("");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        //获取队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
        channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");



        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };

        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

    }
}
public class Recv2 {

    private final static String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("81.71.147.62");
        factory.setUsername("");
        factory.setPassword("");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        //获取队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
        channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");



        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };

        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

    }
}

发表评论 取消回复
表情 图片 链接 代码

分享