- 生产者Producer重试(异步和SendOneWay下配置无效)
- 消费端重试
- 原因:消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等等问题。
- 注意:
- 重试间隔时间配置 ,默认每条消息最多重试 16 次
-
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 超过重试次数人工补偿
- 小例子
-
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; @Component public class PayConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_consumer_group"; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //默认是集群方式,可以更改为广播,但是广播方式不支持重试 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe(JmsConfig.TOPIC, "*"); consumer.registerMessageListener( new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); int times = msg.getReconsumeTimes(); System.out.println("重试次数="+times); try { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); String topic = msg.getTopic(); String body = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); String keys = msg.getKeys(); if(keys.equalsIgnoreCase("6688")){ throw new Exception(); } System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { System.out.println("消费异常"); //如果重试2次不成功,则记录,人工介入 if(times >= 2){ System.out.println("重试次数大于2,记录数据库,发短信通知开发人员或者运营人员"); //TODO 记录数据库,发短信通知开发人员或者运营人员 //告诉broker,消息成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start ..."); } }
- 消费端去重
- 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
- 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
本文作者为DBC,转载请注明。