消费生产者
public class Send {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("81.71.147.62");
factory.setUsername("");
factory.setPassword("");
factory.setVirtualHost("/dev");
factory.setPort(5672);
try ( //JDK7语法 或自动关闭 connnection和channel
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
/**
* 队列名称
* 持久化配置:mq重启后还在
* 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
* 自动删除: 当没有消费者的时候,自动删除掉,一般是false
* 其他参数
*
* 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i=0;i<10;i++){
String message = "Hello World!";
/**
* 参数说明:
* 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由,
* 路由健名称
* 配置信息
* 发送的消息数据:字节数组
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
消费者代码
public class Recv1 {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("81.71.147.62");
factory.setUsername("");
factory.setPassword("");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*]Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//模拟消费缓慢
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + message + "'");
//手工确认消息消费,不是多条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//关闭自动确认消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
public class Recv2 {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("81.71.147.62");
factory.setUsername("");
factory.setPassword("");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*]Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//模拟消费缓慢
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + message + "'");
//手工确认消息消费,不是多条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//关闭自动确认消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
本文作者为DBC,转载请注明。