消息生产者:
public class Send {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("81.71.147.62");
factory.setUsername("");
factory.setPassword("");
factory.setVirtualHost("/dev");
factory.setPort(5672);
try ( //JDK7语法 或自动关闭 connnection和channel
//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
//绑定交换机,topic交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String error = "我是错误日志";
String info = "我是info日志";
String debug = "我是debug日志";
channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功");
}
}
} 消息消费者:两个
public class Recv1 {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("81.71.147.62");
factory.setUsername("");
factory.setPassword("");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
} public class Recv2 {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("81.71.147.62");
factory.setUsername("");
factory.setPassword("");
factory.setVirtualHost("/dev");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
} 本文作者为DBC,转载请注明。