RabbitMQ工作队列 轮训策略

DBC 1.6K 0
温馨提示

轮询策略的区别就是:消费能力强和弱的都拿一样的来处理,可能会造成处理能力强的已经处理好了,而弱的还有很多!

消费生产者
public class Send {
    private final static String QUEUE_NAME = "work_mq_rr";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("81.71.147.62");
        factory.setUsername("");
        factory.setPassword("");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);
        try (   //JDK7语法 或自动关闭 connnection和channel
                //创建连接
                Connection connection = factory.newConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
            /**
             * 队列名称
             * 持久化配置:mq重启后还在
             * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
             * 自动删除: 当没有消费者的时候,自动删除掉,一般是false
             * 其他参数
             *
             * 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i=0;i<10;i++){
                String message = "Hello World!";
                /**
                 * 参数说明:
                 * 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由,
                 * 路由健名称
                 * 配置信息
                 * 发送的消息数据:字节数组
                 */
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }



        }
    }
}
消费者代码
public class Recv1 {
    private final static String QUEUE_NAME = "work_mq_rr";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("81.71.147.62");
        factory.setUsername("");
        factory.setPassword("");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*]Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            //模拟消费缓慢
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] Received '" + message + "'");

            //手工确认消息消费,不是多条确认
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        //关闭自动确认消息
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}


public class Recv2 {
    private final static String QUEUE_NAME = "work_mq_rr";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("81.71.147.62");
        factory.setUsername("");
        factory.setPassword("");
        factory.setVirtualHost("/dev");
        factory.setPort(5672);

        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*]Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            //模拟消费缓慢
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("[x] Received '" + message + "'");

            //手工确认消息消费,不是多条确认
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        //关闭自动确认消息
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}


发表评论 取消回复
表情 图片 链接 代码

分享