Springboot2整合RocketMQ4.x实战消费消息

DBC 1.6K 0

Springboot2.x整合RocketMQ4.x实战,开发消费者代码,常见问题处理

小例子——老规矩看一下结构图Springboot2整合RocketMQ4.x实战消费消息插图
PayConsumer
package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

@Component
public class PayConsumer {


    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_consumer_group";

    public  PayConsumer() throws MQClientException {

        //构造函数优先创建
        consumer = new DefaultMQPushConsumer(consumerGroup);
        //地址
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        //消费的策略
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //订阅一个主题,监听主题的什么标签
        consumer.subscribe(JmsConfig.TOPIC, "*");

        //当有消息到来马上进行消费
//        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//            try {
//                Message msg = msgs.get(0);
//                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
//                String topic = msg.getTopic();
//                String body = new String(msg.getBody(), "utf-8");
//                String tags = msg.getTags();
//                String keys = msg.getKeys();
//                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            } catch (UnsupportedEncodingException e) {
//                e.printStackTrace();
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//            }
//        });

        //上面是用JDK8的语法,不懂可以看我的博客 有相应的文章
        consumer.registerMessageListener( new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    //获取第一条消息进行消费
                Message msg = msgs.get(0);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                //msg里面有很多的属性,具体可以看官方的文档
                String topic = msg.getTopic();
                //转换成为字符串
                String body = new String(msg.getBody(), "utf-8");
                String tags = msg.getTags();
                String keys = msg.getKeys();
                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                //告诉消息 我们已经成功消费了
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (UnsupportedEncodingException e) {

                e.printStackTrace();
                //稍后进行消费  这里可以设置重复消费几次之后都没有成功,那么可以写入数据库,然后由人工进行审核
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            }
        });

        consumer.start();
        System.out.println("consumer start ...");
    }

}
控制台输出Springboot2整合RocketMQ4.x实战消费消息插图2
温馨提示

可以看到,已经消费成功!

常见问题

1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed

2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]

3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local, MacBook-Air.local, MacBook-Air.local]
解决:多网卡问题处理
1、设置producer: producer.setVipChannelEnabled(false);
2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip)
namesrvAddr = 192.168.0.101:9876
brokerIP1 = 192.168.0.101

4、DESC: service not available now, maybe disk full, CL:
解决:修改启动脚本runbroker.sh,在里面增加一句话即可:
JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
(磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息)

常见问题处理
https://blog.csdn.net/sqzhao/article/details/54834761
https://blog.csdn.net/mayifan0/article/details/67633729
https://blog.csdn.net/a906423355/article/details/78192828

发表评论 取消回复
表情 图片 链接 代码

分享