基础介绍顺序消息和对应可以使用的场景,订单系统
- 什么是顺序消息:消息的生产和消费顺序一致
- 全局顺序:topic下面全部消息都要有序(少用)
- 性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景,并行度成为消息系统的瓶颈, 吞吐量不够.
- 在证券处理中,以人民币兑换美元为例子,在价格相同的情况下,先出价者优先处理,则可以通过全局顺序的方式按照 FIFO 的方式进行发布和消费
- 局部顺序:只要保证一组消息被顺序消费即可(RocketMQ使用)
- 性能要求高
- 电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息、订单交易成功消息 都会按照先后顺序来发布和消费(阿里巴巴集团内部电商系统均使用局部顺序消息,既保证业务的顺序,同时又能保证业务的高性能)
- 全局顺序:topic下面全部消息都要有序(少用)
- 顺序发布:对于指定的一个 Topic,客户端将按照一定的先后顺序发送消息
- 顺序消费:对于指定的一个 Topic,按照一定的先后顺序接收消息,即先发送的消息一定会先被客户端接收到。
- 注意:
- 顺序消息暂不支持广播模式
- 顺序消息不支持异步发送方式,否则将无法严格保证顺序
核心知识之RocketMQ顺序消息讲解
- 生产端保证发送消息有序,且发送到同一个Topic的同个queue里面,RocketMQ的确是能保证FIFO的
- 例子:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中,根据MessageQueueSelector里面自定义策略,根据同个业务id放置到同个queue里面,如订单号取模运算再放到selector中,同一个模的值都会投递到同一条queue
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //如果是订单号是字符串,则进行hash,得到一个hash值 Long id = (Long) arg; long index = id % mqs.size(); return mqs.get((int)index); }
- 例子:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中,根据MessageQueueSelector里面自定义策略,根据同个业务id放置到同个queue里面,如订单号取模运算再放到selector中,同一个模的值都会投递到同一条queue
- 消费端要在保证消费同个topic里的同个队列,不应该用MessageListenerConcurrently,应该使用MessageListenerOrderly,自带单线程消费消息,不能再Consumer端再使用多线程去消费,消费端分配到的queue数量是固定的,集群消会锁住当前正在消费的队列集合的消息,所以会保证顺序消费。
- 官方例子 https://rocketmq.apache.org/docs/order-example/
发送消息小例子——搞一个实体类,搞一个新的topic,搞一个新的接口,接口应该很好理解,根据ID来确定唯一队列
ProductOrder
package net.xdclass.xdclassmq.domain;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ProductOrder implements Serializable {
//订单id
private long orderId;
//操作类型
private String type;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public ProductOrder(){}
public ProductOrder(long orderId ,String type){
this.orderId = orderId;
this.type = type;
}
public static List<ProductOrder> getOrderList(){
List<ProductOrder> list = new ArrayList<>();
list.add(new ProductOrder(111L,"创建订单"));
list.add(new ProductOrder(222L,"创建订单"));
list.add(new ProductOrder(111L,"支付订单"));
list.add(new ProductOrder(222L,"支付订单"));
list.add(new ProductOrder(111L,"完成订单"));
list.add(new ProductOrder(333L,"创建订单"));
list.add(new ProductOrder(222L,"完成订单"));
list.add(new ProductOrder(333L,"支付订单"));
list.add(new ProductOrder(333L,"完成订单"));
return list;
}
@Override
public String toString() {
return "ProductOrder{" +
"orderId=" + orderId +
", type='" + type + '\'' +
'}';
}
}
public static final String ORDERLY_TOPIC = "xdclass_pay_order_topic_orderly";
PayController
package net.xdclass.xdclassmq.controller;
import net.xdclass.xdclassmq.domain.ProductOrder;
import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.List;
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
@RequestMapping("/api/v2/pay_cb")
public Object callback() throws Exception {
List<ProductOrder> list = ProductOrder.getOrderList();
for(int i=0; i< list.size(); i++){
ProductOrder order = list.get(i);
Message message = new Message(JmsConfig.ORDERLY_TOPIC,"",
order.getOrderId()+"",order.toString().getBytes());
SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
long index = id % mqs.size();
return mqs.get((int)index);
}
},order.getOrderId());
System.out.printf("发送结果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(),order.getOrderId(),order.getType());
}
return new HashMap<>();
}
}
案例实战之RocketMQ顺序消息消息者消费实战——消息的消费
- MessageListenerConcurrently
- MessageListenerOrderly
- Consumer会平均分配queue的数量
- 并不是简单禁止并发处理,而是为每个Consumer Quene加个锁,消费每个消息前,需要获得这个消息所在的Queue的锁,这样同个时间,同个Queue的消息不被并发消费,但是不同Queue的消息可以并发处理
扩展思维:为什么高并发情况下ConcurrentHashMap比HashTable和HashMap更高效且线程安全?
简单回答:因为它内部使用的是分段锁,所以更加安全
小例子:看到这里,应该有电迷糊了,我们来看看结构图
PayController
package net.xdclass.xdclassmq.controller;
import net.xdclass.xdclassmq.domain.ProductOrder;
import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.List;
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
@RequestMapping("/api/v2/pay_cb")
public Object callback() throws Exception {
List<ProductOrder> list = ProductOrder.getOrderList();
for(int i=0; i< list.size(); i++){
ProductOrder order = list.get(i);
Message message = new Message(JmsConfig.ORDERLY_TOPIC,"",
order.getOrderId()+"",order.toString().getBytes());
SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
long index = id % mqs.size();
return mqs.get((int)index);
}
},order.getOrderId());
System.out.printf("发送结果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(),order.getOrderId(),order.getType());
}
return new HashMap<>();
}
}
JmsConfig
public class JmsConfig {
public static final String NAME_SERVER = "81.71.147.62:9876";
public static final String TOPIC = "xdclass_pay_test_topic";
public static final String ORDERLY_TOPIC = "xdclass_pay_order_topic_orderly";
}
最关键的是这个——PayOrderlyConsumer
package net.xdclass.xdclassmq.jms;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class PayOrderlyConsumer {
private DefaultMQPushConsumer consumer;
private String consumerGroup = "pay_orderly_consumer_group";
public PayOrderlyConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//默认是集群方式,可以更改为广播,但是广播方式不支持重试
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe(JmsConfig.ORDERLY_TOPIC, "*");
consumer.registerMessageListener( new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
MessageExt msg = msgs.get(0);
try {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
//做业务逻辑操作 TODO
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
consumer.start();
System.out.println("consumer start ...");
}
} 本文作者为DBC,转载请注明。
