Springboot2整合RocketMQ4.x实战消费消息 DBC 2021-07-20 1.7K 0 Springboot2.x整合RocketMQ4.x实战,开发消费者代码,常见问题处理 小例子——老规矩看一下结构图 PayConsumer package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; @Component public class PayConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_consumer_group"; public PayConsumer() throws MQClientException { //构造函数优先创建 consumer = new DefaultMQPushConsumer(consumerGroup); //地址 consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); //消费的策略 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //订阅一个主题,监听主题的什么标签 consumer.subscribe(JmsConfig.TOPIC, "*"); //当有消息到来马上进行消费 // consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { // try { // Message msg = msgs.get(0); // System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); // String topic = msg.getTopic(); // String body = new String(msg.getBody(), "utf-8"); // String tags = msg.getTags(); // String keys = msg.getKeys(); // System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // } catch (UnsupportedEncodingException e) { // e.printStackTrace(); // return ConsumeConcurrentlyStatus.RECONSUME_LATER; // } // }); //上面是用JDK8的语法,不懂可以看我的博客 有相应的文章 consumer.registerMessageListener( new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { //获取第一条消息进行消费 Message msg = msgs.get(0); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); //msg里面有很多的属性,具体可以看官方的文档 String topic = msg.getTopic(); //转换成为字符串 String body = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); String keys = msg.getKeys(); System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); //告诉消息 我们已经成功消费了 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (UnsupportedEncodingException e) { e.printStackTrace(); //稍后进行消费 这里可以设置重复消费几次之后都没有成功,那么可以写入数据库,然后由人工进行审核 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start ..."); } } 控制台输出 温馨提示 可以看到,已经消费成功! 常见问题 1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed 2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null] 3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local, MacBook-Air.local, MacBook-Air.local] 解决:多网卡问题处理 1、设置producer: producer.setVipChannelEnabled(false); 2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip) namesrvAddr = 192.168.0.101:9876 brokerIP1 = 192.168.0.101 4、DESC: service not available now, maybe disk full, CL: 解决:修改启动脚本runbroker.sh,在里面增加一句话即可: JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98" (磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息) 常见问题处理 https://blog.csdn.net/sqzhao/article/details/54834761 https://blog.csdn.net/mayifan0/article/details/67633729 https://blog.csdn.net/a906423355/article/details/78192828 本文作者为DBC,转载请注明。 0人点赞 打赏