消息队列RocketMQ4.X消费者核心配置讲解
- consumeFromWhere配置(某些情况失效:参考https://blog.csdn.net/a417930422/article/details/83585397)
- CONSUME_FROM_FIRST_OFFSET: 初次从消息队列头部开始消费,即历史消息(还储存在broker的)全部消费一遍,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_LAST_OFFSET: 默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_TIMESTAMP : 从某个时间点开始消费,默认是半个小时以前,后续再启动接着上次消费的进度开始消费
- allocateMessageQueueStrategy:
- 负载均衡策略算法,即消费者分配到queue的算法,默认值是AllocateMessageQueueAveragely即取模平均分配
- offsetStore:消息消费进度存储器 offsetStore 有两个策略:
- LocalFileOffsetStore 和 RemoteBrokerOffsetStor 广播模式默认使用LocalFileOffsetStore 集群模式默认使用RemoteBrokerOffsetStore
- consumeThreadMin 最小消费线程池数量
- consumeThreadMax 最大消费线程池数量
- pullBatchSize: 消费者去broker拉取消息时,一次拉取多少条。可选配置
- consumeMessageBatchMaxSize: 单次消费时一次性消费多少条消息,批量消费接口才有用,可选配置
- messageModel : 消费者消费模式, CLUSTERING——默认是集群模式CLUSTERING BROADCASTING——广播模式
集群和广播模式下RocketMQ消费端处理
- Topic下队列的奇偶数会影响Customer个数里面的消费数量
- 如果是4个队列,8个消息,4个节点则会各消费2条,如果不对等,则负载均衡会分配不均,
- 如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,所以需要控制让queue的总数量大于等于consumer的数量
- 集群模式(默认):
- Consumer实例平均分摊消费生产者发送的消息
- 例子:订单消息,一般是只被消费一次
- 广播模式:
- 广播模式下消费消息:投递到Broker的消息会被每个Consumer进行消费,一条消息被多个Consumer消费,广播消费中ConsumerGroup暂时无用
- 例子:群公告,每个人都需要消费这个消息
- 怎么切换模式:通过setMessageModel()
讲解RocketMQ里面的Tag作用和消息过滤原理
- 一个Message只有一个Tag,tag是二级分类
- 订单:数码类订单、食品类订单
- 过滤分为Broker端和Consumer端过滤
- Broker端过滤,减少了无用的消息的进行网络传输,增加了broker的负担
- Consumer端过滤,完全可以根据业务需求进行实习,但是增加了很多无用的消息传输
- 一般是监听 * ,或者指定 tag,|| 运算 , SLQ92 , FilterServer等;
- tag性能高,逻辑简单
- SQL92 性能差点,支持复杂逻辑(只支持PushConsumer中使用) MessageSelector.bySql
- 语法:> , < = ,IS NULL, AND, OR, NOT 等,sql where后续的语法即可(大部分)
- 注意:消费者订阅关系要一致,不然会消费混乱,甚至消息丢失
- 订阅关系一致:订阅关系由 Topic和 Tag 组成,同一个 group name,订阅的 topic和tag 必须是一样的
- 在Broker 端进行MessageTag过滤,遍历message queue存储的 message tag和 订阅传递的tag 的hashcode不一样则跳过,符合的则传输给Consumer,在consumer queue存储的是对应的hashcode, 对比也是通过hashcode对比; Consumer收到过滤消息后也会进行匹配操作,但是是对比真实的message tag而不是hashcode
- consume queue存储使用hashcode定长,节约空间
- 过滤中不访问commit log,可以高效过滤
- 如果存在hash冲突,Consumer端可以进行再次确认
- 如果想使用多个Tag,可以使用sql表达式,但是不建议,单一职责,多个队列
可以看到,控制台这个位置是false,我们需要进行修改才能使用SLQ92
注意,改的是properties,根据你需要的进行改,这里是主从的示例!
修改成功之后,可以看下面的小例子
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; import java.util.List; @Component public class PayOrderlyConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_orderly_consumer_group"; public PayOrderlyConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //默认是集群方式,可以更改为广播,但是广播方式不支持重试 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe(JmsConfig.ORDERLY_TOPIC, "*"); consumer.registerMessageListener( new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { MessageExt msg = msgs.get(0); try { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); //做业务逻辑操作 TODO return ConsumeOrderlyStatus.SUCCESS; } catch (Exception e) { e.printStackTrace(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }); consumer.start(); System.out.println("consumer start ..."); } }
PushConsumer、PullConsumer消费模式分析
- Push和Pull优缺点分析
- Push
- 实时性高;但增加服务端负载,消费端能力不同,如果Push推送过快,消费端会出现很多问题
- Pull
- 消费者从Server端拉取消息,主动权在消费者端,可控性好;但 间隔时间不好设置,间隔太短,则空请求,浪费资源;间隔时间太长,则消息不能及时处理
- 长轮询: Client请求Server端也就是Broker的时候, Broker会保持当前连接一段时间 默认是15s,如果这段时间内有消息到达,则立刻返回给Consumer.没消息的话 超过15s,则返回空,再进行重新请求;主动权在Consumer中,Broker即使有大量的消息 也不会主动提送Consumer, 缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控 否则会一堆连接
- Push
- PushConsumer本质是长轮训
- 系统收到消息后自动处理消息和offset,如果有新的Consumer加入会自动做负载均衡,
- 在broker端可以通过longPollingEnable=true来开启长轮询
- 消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
- 服务端代码:broker.longpolling
- 虽然是push,但是代码里面大量使用了pull,是因为使用长轮训方式达到Push效果,既有pull有的,又有Push的实时性
- 优雅关闭:主要是释放资源和保存Offset, 调用shutdown()即可 ,参考 @PostConstruct、@PreDestroy
- PullConsumer需要自己维护Offset(参考官方例子)
- 官方例子路径:org.apache.rocketmq.example.simple.PullConsumer
- 获取MessageQueue遍历
- 客户维护Offset,需用用户本地存储Offset,存储内存、磁盘、数据库等
- 处理不同状态的消息 FOUND、NO_NEW_MSG、OFFSET_ILLRGL、NO_MATCHED_MSG、4种状态
- 灵活性高可控性强,但是编码复杂度会高
- 优雅关闭:主要是释放资源和保存Offset,需用程序自己保存好Offset,特别是异常处理的时候
本文作者为DBC,转载请注明。