RabbitMQ简单队列

DBC 1.6K 0

消息生产者

public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.13");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        try (   //JDK7语法 或自动关闭 connnection和channel
                //创建连接
                Connection connection = factory.newConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
            /**
             * 队列名称
             * 持久化配置:mq重启后还在
             * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
             * 自动删除: 当没有消费者的时候,自动删除掉,一般是false
             * 其他参数
             *
             * 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            /**
             * 参数说明:
             * 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由,
             * 路由健名称
             * 配置信息
             * 发送的消息数据:字节数组
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消息消费者

public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.13");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/xdclass1");
        factory.setPort(5672);

        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //回调方法,下面两种都行
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
                System.out.println("consumerTag消息标识="+consumerTag);
                //可以获取交换机,路由健等
                System.out.println("envelope元数据="+envelope);

                System.out.println("properties配置信息="+properties);

                System.out.println("body="+new String(body,"utf-8"));
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);

//        DeliverCallback deliverCallback = (consumerTag, envelop, delivery,properties, msg) -> {
//            String message = new String(msg, "UTF-8");
//            System.out.println(" [x] Received '" + message + "'");
//        };

        //自动确认消息
        //channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

发表评论 取消回复
表情 图片 链接 代码

分享