基础介绍顺序消息和对应可以使用的场景,订单系统
- 什么是顺序消息:消息的生产和消费顺序一致
- 全局顺序:topic下面全部消息都要有序(少用)
- 性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景,并行度成为消息系统的瓶颈, 吞吐量不够.
- 在证券处理中,以人民币兑换美元为例子,在价格相同的情况下,先出价者优先处理,则可以通过全局顺序的方式按照 FIFO 的方式进行发布和消费
- 局部顺序:只要保证一组消息被顺序消费即可(RocketMQ使用)
- 性能要求高
- 电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息、订单交易成功消息 都会按照先后顺序来发布和消费(阿里巴巴集团内部电商系统均使用局部顺序消息,既保证业务的顺序,同时又能保证业务的高性能)
- 全局顺序:topic下面全部消息都要有序(少用)
- 顺序发布:对于指定的一个 Topic,客户端将按照一定的先后顺序发送消息
- 顺序消费:对于指定的一个 Topic,按照一定的先后顺序接收消息,即先发送的消息一定会先被客户端接收到。
- 注意:
- 顺序消息暂不支持广播模式
- 顺序消息不支持异步发送方式,否则将无法严格保证顺序
核心知识之RocketMQ顺序消息讲解
- 生产端保证发送消息有序,且发送到同一个Topic的同个queue里面,RocketMQ的确是能保证FIFO的
- 例子:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中,根据MessageQueueSelector里面自定义策略,根据同个业务id放置到同个queue里面,如订单号取模运算再放到selector中,同一个模的值都会投递到同一条queue
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //如果是订单号是字符串,则进行hash,得到一个hash值 Long id = (Long) arg; long index = id % mqs.size(); return mqs.get((int)index); }
- 例子:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中,根据MessageQueueSelector里面自定义策略,根据同个业务id放置到同个queue里面,如订单号取模运算再放到selector中,同一个模的值都会投递到同一条queue
- 消费端要在保证消费同个topic里的同个队列,不应该用MessageListenerConcurrently,应该使用MessageListenerOrderly,自带单线程消费消息,不能再Consumer端再使用多线程去消费,消费端分配到的queue数量是固定的,集群消会锁住当前正在消费的队列集合的消息,所以会保证顺序消费。
- 官方例子 https://rocketmq.apache.org/docs/order-example/
发送消息小例子——搞一个实体类,搞一个新的topic,搞一个新的接口,接口应该很好理解,根据ID来确定唯一队列
ProductOrder
package net.xdclass.xdclassmq.domain; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProductOrder implements Serializable { //订单id private long orderId; //操作类型 private String type; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getType() { return type; } public void setType(String type) { this.type = type; } public ProductOrder(){} public ProductOrder(long orderId ,String type){ this.orderId = orderId; this.type = type; } public static List<ProductOrder> getOrderList(){ List<ProductOrder> list = new ArrayList<>(); list.add(new ProductOrder(111L,"创建订单")); list.add(new ProductOrder(222L,"创建订单")); list.add(new ProductOrder(111L,"支付订单")); list.add(new ProductOrder(222L,"支付订单")); list.add(new ProductOrder(111L,"完成订单")); list.add(new ProductOrder(333L,"创建订单")); list.add(new ProductOrder(222L,"完成订单")); list.add(new ProductOrder(333L,"支付订单")); list.add(new ProductOrder(333L,"完成订单")); return list; } @Override public String toString() { return "ProductOrder{" + "orderId=" + orderId + ", type='" + type + '\'' + '}'; } }
public static final String ORDERLY_TOPIC = "xdclass_pay_order_topic_orderly";
PayController
package net.xdclass.xdclassmq.controller; import net.xdclass.xdclassmq.domain.ProductOrder; import net.xdclass.xdclassmq.jms.JmsConfig; import net.xdclass.xdclassmq.jms.PayProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.List; @RestController public class PayController { @Autowired private PayProducer payProducer; @RequestMapping("/api/v2/pay_cb") public Object callback() throws Exception { List<ProductOrder> list = ProductOrder.getOrderList(); for(int i=0; i< list.size(); i++){ ProductOrder order = list.get(i); Message message = new Message(JmsConfig.ORDERLY_TOPIC,"", order.getOrderId()+"",order.toString().getBytes()); SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; long index = id % mqs.size(); return mqs.get((int)index); } },order.getOrderId()); System.out.printf("发送结果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(),order.getOrderId(),order.getType()); } return new HashMap<>(); } }
案例实战之RocketMQ顺序消息消息者消费实战——消息的消费
- MessageListenerConcurrently
- MessageListenerOrderly
- Consumer会平均分配queue的数量
- 并不是简单禁止并发处理,而是为每个Consumer Quene加个锁,消费每个消息前,需要获得这个消息所在的Queue的锁,这样同个时间,同个Queue的消息不被并发消费,但是不同Queue的消息可以并发处理
扩展思维:为什么高并发情况下ConcurrentHashMap比HashTable和HashMap更高效且线程安全?
简单回答:因为它内部使用的是分段锁,所以更加安全
小例子:看到这里,应该有电迷糊了,我们来看看结构图
PayController
package net.xdclass.xdclassmq.controller; import net.xdclass.xdclassmq.domain.ProductOrder; import net.xdclass.xdclassmq.jms.JmsConfig; import net.xdclass.xdclassmq.jms.PayProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.List; @RestController public class PayController { @Autowired private PayProducer payProducer; @RequestMapping("/api/v2/pay_cb") public Object callback() throws Exception { List<ProductOrder> list = ProductOrder.getOrderList(); for(int i=0; i< list.size(); i++){ ProductOrder order = list.get(i); Message message = new Message(JmsConfig.ORDERLY_TOPIC,"", order.getOrderId()+"",order.toString().getBytes()); SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; long index = id % mqs.size(); return mqs.get((int)index); } },order.getOrderId()); System.out.printf("发送结果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(),order.getOrderId(),order.getType()); } return new HashMap<>(); } }
JmsConfig
public class JmsConfig { public static final String NAME_SERVER = "81.71.147.62:9876"; public static final String TOPIC = "xdclass_pay_test_topic"; public static final String ORDERLY_TOPIC = "xdclass_pay_order_topic_orderly"; }
最关键的是这个——PayOrderlyConsumer
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; import java.util.List; @Component public class PayOrderlyConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_orderly_consumer_group"; public PayOrderlyConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //默认是集群方式,可以更改为广播,但是广播方式不支持重试 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe(JmsConfig.ORDERLY_TOPIC, "*"); consumer.registerMessageListener( new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { MessageExt msg = msgs.get(0); try { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); //做业务逻辑操作 TODO return ConsumeOrderlyStatus.SUCCESS; } catch (Exception e) { e.printStackTrace(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }); consumer.start(); System.out.println("consumer start ..."); } }
本文作者为DBC,转载请注明。