消费生产者
public class Send { private final static String QUEUE_NAME = "work_mq_rr"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.71.147.62"); factory.setUsername(""); factory.setPassword(""); factory.setVirtualHost("/dev"); factory.setPort(5672); try ( //JDK7语法 或自动关闭 connnection和channel //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { /** * 队列名称 * 持久化配置:mq重启后还在 * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占 * 自动删除: 当没有消费者的时候,自动删除掉,一般是false * 其他参数 * * 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i=0;i<10;i++){ String message = "Hello World!"; /** * 参数说明: * 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由, * 路由健名称 * 配置信息 * 发送的消息数据:字节数组 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } } }
消费者代码
public class Recv1 { private final static String QUEUE_NAME = "work_mq_fair"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.71.147.62"); factory.setUsername(""); factory.setPassword(""); factory.setVirtualHost("/dev"); factory.setPort(5672); //消费者一般不增加自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*]Waiting for messages. To exit press CTRL+C"); /** * 限制消费者每次消费1个,消费完成后再进行下一个 */ channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模拟消费缓慢 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), "UTF-8"); System.out.println("[x] Received '" + message + "'"); //手工确认消息消费,不是多条确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; //关闭自动确认消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } } public class Recv2 { private final static String QUEUE_NAME = "work_mq_fair"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.71.147.62"); factory.setUsername(""); factory.setPassword(""); factory.setVirtualHost("/dev"); factory.setPort(5672); //消费者一般不增加自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*]Waiting for messages. To exit press CTRL+C"); /** * 限制消费者每次消费1个,消费完成后再进行下一个 */ channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模拟消费缓慢 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), "UTF-8"); System.out.println("[x] Received '" + message + "'"); //手工确认消息消费,不是多条确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; //关闭自动确认消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
本文作者为DBC,转载请注明。