核心知识之RocketMQ4.X生产和消费消息重试及处理

DBC 1.7K 0
  • 生产者Producer重试(异步和SendOneWay下配置无效)
    • 消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2,
    • 如果网络情况比较差,或者跨集群则建改多几次
    • 核心知识之RocketMQ4.X生产和消费消息重试及处理插图

    • //生产者投递消息重试次数
      producer.setRetryTimesWhenSendFailed(3);
  • 消费端重试
    • 原因:消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等等问题。
    • 注意:
      • 重试间隔时间配置 ,默认每条消息最多重试 16 次
      • messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  • 超过重试次数人工补偿
  • 小例子
  • package net.xdclass.xdclassmq.jms;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    @Component
    public class PayConsumer {
    
    
        private DefaultMQPushConsumer consumer;
    
        private String consumerGroup = "pay_consumer_group";
    
        public  PayConsumer() throws MQClientException {
    
            consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    
            //默认是集群方式,可以更改为广播,但是广播方式不支持重试
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.subscribe(JmsConfig.TOPIC, "*");
    
    
            consumer.registerMessageListener( new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt msg = msgs.get(0);
                    int times = msg.getReconsumeTimes();
                    System.out.println("重试次数="+times);
    
                    try {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
    
                    String topic = msg.getTopic();
                    String body = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
    
                    if(keys.equalsIgnoreCase("6688")){
                        throw new Exception();
                    }
    
                    System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
    
                    System.out.println("消费异常");
                    //如果重试2次不成功,则记录,人工介入
                    if(times >= 2){
                        System.out.println("重试次数大于2,记录数据库,发短信通知开发人员或者运营人员");
                        //TODO 记录数据库,发短信通知开发人员或者运营人员
                        //告诉broker,消息成功
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                }
            });
    
            consumer.start();
            System.out.println("consumer start ...");
        }
    
    }
    
  • 消费端去重
  • 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
  • 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息

发表评论 取消回复
表情 图片 链接 代码

分享