讲解顺序消息在电商和证券系统中应用场景

DBC 1.6K 0

基础介绍顺序消息和对应可以使用的场景,订单系统

  • 什么是顺序消息:消息的生产和消费顺序一致
    • 全局顺序:topic下面全部消息都要有序(少用)
      • 性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景,并行度成为消息系统的瓶颈, 吞吐量不够.
      • 在证券处理中,以人民币兑换美元为例子,在价格相同的情况下,先出价者优先处理,则可以通过全局顺序的方式按照 FIFO 的方式进行发布和消费
    • 局部顺序:只要保证一组消息被顺序消费即可(RocketMQ使用)
      • 性能要求高
      • 电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息、订单交易成功消息 都会按照先后顺序来发布和消费(阿里巴巴集团内部电商系统均使用局部顺序消息,既保证业务的顺序,同时又能保证业务的高性能)
  • 顺序发布:对于指定的一个 Topic,客户端将按照一定的先后顺序发送消息
  • 顺序消费:对于指定的一个 Topic,按照一定的先后顺序接收消息,即先发送的消息一定会先被客户端接收到。
  • 注意:
    • 顺序消息暂不支持广播模式
    • 顺序消息不支持异步发送方式,否则将无法严格保证顺序

核心知识之RocketMQ顺序消息讲解

  • 生产端保证发送消息有序,且发送到同一个Topic的同个queue里面,RocketMQ的确是能保证FIFO的
    • 例子:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中,根据MessageQueueSelector里面自定义策略,根据同个业务id放置到同个queue里面,如订单号取模运算再放到selector中,同一个模的值都会投递到同一条queue
      public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
      		//如果是订单号是字符串,则进行hash,得到一个hash值
      	  Long id = (Long) arg;
            long index = id % mqs.size();
            return mqs.get((int)index);
          }
      
  • 消费端要在保证消费同个topic里的同个队列,不应该用MessageListenerConcurrently,应该使用MessageListenerOrderly,自带单线程消费消息,不能再Consumer端再使用多线程去消费,消费端分配到的queue数量是固定的,集群消会锁住当前正在消费的队列集合的消息,所以会保证顺序消费。
  • 官方例子 https://rocketmq.apache.org/docs/order-example/
发送消息小例子——搞一个实体类,搞一个新的topic,搞一个新的接口,接口应该很好理解,根据ID来确定唯一队列

ProductOrder

package net.xdclass.xdclassmq.domain;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class ProductOrder implements Serializable {

    //订单id
    private long orderId;

    //操作类型
    private String type;


    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public  ProductOrder(){}

    public  ProductOrder(long orderId ,String type){
        this.orderId = orderId;
        this.type = type;

    }


    public static List<ProductOrder> getOrderList(){

        List<ProductOrder> list = new ArrayList<>();
        list.add(new ProductOrder(111L,"创建订单"));
        list.add(new ProductOrder(222L,"创建订单"));
        list.add(new ProductOrder(111L,"支付订单"));
        list.add(new ProductOrder(222L,"支付订单"));
        list.add(new ProductOrder(111L,"完成订单"));
        list.add(new ProductOrder(333L,"创建订单"));
        list.add(new ProductOrder(222L,"完成订单"));
        list.add(new ProductOrder(333L,"支付订单"));
        list.add(new ProductOrder(333L,"完成订单"));

        return list;

    }

    @Override
    public String toString() {
        return "ProductOrder{" +
                "orderId=" + orderId +
                ", type='" + type + '\'' +
                '}';
    }
}
public static final String ORDERLY_TOPIC = "xdclass_pay_order_topic_orderly";

PayController

package net.xdclass.xdclassmq.controller;

import net.xdclass.xdclassmq.domain.ProductOrder;
import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.List;

@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;


    @RequestMapping("/api/v2/pay_cb")
    public Object callback() throws Exception {

        List<ProductOrder> list  = ProductOrder.getOrderList();

        for(int i=0; i< list.size(); i++){
            ProductOrder order = list.get(i);
            Message message = new Message(JmsConfig.ORDERLY_TOPIC,"",
                    order.getOrderId()+"",order.toString().getBytes());

            SendResult sendResult =  payProducer.getProducer().send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;
                    long index = id % mqs.size();
                    return mqs.get((int)index);
                }
            },order.getOrderId());


            System.out.printf("发送结果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(),order.getOrderId(),order.getType());

        }



        return new HashMap<>();
    }


}
温馨提示

到这里,我们的消息就发送成功了

案例实战之RocketMQ顺序消息消息者消费实战——消息的消费

  • MessageListenerConcurrently
  • MessageListenerOrderly
    • Consumer会平均分配queue的数量
    • 并不是简单禁止并发处理,而是为每个Consumer Quene加个锁,消费每个消息前,需要获得这个消息所在的Queue的锁,这样同个时间,同个Queue的消息不被并发消费,但是不同Queue的消息可以并发处理 

      扩展思维:为什么高并发情况下ConcurrentHashMap比HashTable和HashMap更高效且线程安全?

      简单回答:因为它内部使用的是分段锁,所以更加安全

小例子:看到这里,应该有电迷糊了,我们来看看结构图讲解顺序消息在电商和证券系统中应用场景插图

PayController

package net.xdclass.xdclassmq.controller;

import net.xdclass.xdclassmq.domain.ProductOrder;
import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.List;

@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;


    @RequestMapping("/api/v2/pay_cb")
    public Object callback() throws Exception {

        List<ProductOrder> list  = ProductOrder.getOrderList();

        for(int i=0; i< list.size(); i++){
            ProductOrder order = list.get(i);
            Message message = new Message(JmsConfig.ORDERLY_TOPIC,"",
                    order.getOrderId()+"",order.toString().getBytes());

            SendResult sendResult =  payProducer.getProducer().send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;
                    long index = id % mqs.size();
                    return mqs.get((int)index);
                }
            },order.getOrderId());


            System.out.printf("发送结果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(),order.getOrderId(),order.getType());

        }



        return new HashMap<>();
    }


}

JmsConfig

public class JmsConfig {


    public static final String NAME_SERVER = "81.71.147.62:9876";

    public static final String TOPIC = "xdclass_pay_test_topic";

    public static final String ORDERLY_TOPIC = "xdclass_pay_order_topic_orderly";

}
最关键的是这个——PayOrderlyConsumer
PayOrderlyConsumer
package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class PayOrderlyConsumer {

private DefaultMQPushConsumer consumer;

private String consumerGroup = "pay_orderly_consumer_group";

public PayOrderlyConsumer() throws MQClientException {

consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

//默认是集群方式,可以更改为广播,但是广播方式不支持重试
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe(JmsConfig.ORDERLY_TOPIC, "*");

consumer.registerMessageListener( new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
MessageExt msg = msgs.get(0);
try {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

//做业务逻辑操作 TODO

return ConsumeOrderlyStatus.SUCCESS;

} catch (Exception e) {

e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}

});

consumer.start();
System.out.println("consumer start ...");
}

}

发表评论 取消回复
表情 图片 链接 代码

分享