消息生产者:
public class Send { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.71.147.62"); factory.setUsername(""); factory.setPassword(""); factory.setVirtualHost("/dev"); factory.setPort(5672); try ( //JDK7语法 或自动关闭 connnection和channel //创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { //绑定交换机,topic交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String error = "我是错误日志"; String info = "我是info日志"; String debug = "我是debug日志"; channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送成功"); } } }
消息消费者:两个
public class Recv1 { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.71.147.62"); factory.setUsername(""); factory.setPassword(""); factory.setVirtualHost("/dev"); factory.setPort(5672); //消费者一般不增加自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; //自动确认消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
public class Recv2 { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("81.71.147.62"); factory.setUsername(""); factory.setPassword(""); factory.setVirtualHost("/dev"); factory.setPort(5672); //消费者一般不增加自动关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; //自动确认消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
本文作者为DBC,转载请注明。