消息生产者:
public class Send {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("81.71.147.62");
factory.setUsername("");
factory.setPassword("");
factory.setVirtualHost("/dev");
factory.setPort(5672);
try ( //JDK7语法 或自动关闭 connnection和channel
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
//绑定交换机,直连交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String error = "我是错误日志";
String info = "我是info日志";
String debug = "我是debug日志";
channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功");
}
}
}
消息消费者(两个节点)
public class Recv1 {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("81.71.147.62");
factory.setUsername("");
factory.setPassword("");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
public class Recv2 {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("81.71.147.62");
factory.setUsername("");
factory.setPassword("");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
本文作者为DBC,转载请注明。