优惠券微服务延迟消息消费-释放优惠券功能开发《上》

DBC 1.7K 0
流程梳理

优惠券解锁记录场景
1、超时未支付,比如30分钟则订单失效关闭
2、下单成功,创建订单业务失败,订单回滚

库存解锁防止继续支付:
1、30分支付超时则无法支付订单

2、订单31分延迟消息(比订单超时大几分钟)
->查询订单状态-向第三方支付查询订单状态,只有未支付状态,且本地订单状态是NEW,才修改本地订单状态为取消CANCEL,其他业务才可以解锁对应的库存库存

3、商品、优惠券库存32分延迟消息(比订单超时大几分钟)
->查询订单状态-订单不存在,解锁库存
->查询订单状态
1)订单状态为取消CANCEL的情况,才可以解锁库存,确认消息接收;
2)订单状态为未支付NEW的情况,则不解锁库存,不修改订单状态,重新投递消息或者拒收;
(避免网络延迟到 导致订单关单消息,比库存解锁还慢,没更新订单状态)
3)如果是订单已经支付则修改库存task工作单状态,确认消息接收;

注意:延迟队列一定要开启手动的ack机制,防止解锁失败,消息丢失,也要防止多次解锁
解锁库存的时候:修改状态和修改对应库存task工作单状态应该是同个事务,防止其中一个失败

优惠券微服务延迟消息消费-释放优惠券功能开发《上》插图

CouponMQListener

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import net.xdclass.model.CouponRecordMessage;
import net.xdclass.service.CouponRecordService;
import org.checkerframework.checker.units.qual.A;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.locks.Lock;



@Slf4j
@Component
@RabbitListener(queues = "${mqconfig.coupon_release_queue}")
public class CouponMQListener {


    @Autowired
    private CouponRecordService couponRecordService;

    @Autowired
    private RedissonClient redissonClient;

    /**
     *
     * 重复消费-幂等性
     *
     * 消费失败,重新入队后最大重试次数:
     *  如果消费失败,不重新入队,可以记录日志,然后插到数据库人工排查
     *
     *  消费者这块还有啥问题,大家可以先想下,然后给出解决方案
     *
     * @param recordMessage
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitHandler
    public void releaseCouponRecord(CouponRecordMessage recordMessage, Message message, Channel channel) throws IOException {

        log.info("监听到消息:releaseCouponRecord消息内容:{}", recordMessage);
        long msgTag = message.getMessageProperties().getDeliveryTag();

        boolean flag = couponRecordService.releaseCouponRecord(recordMessage);

        //防止同个解锁任务并发进入;如果是串行消费不用加锁;加锁有利也有弊,看项目业务逻辑而定
        //Lock lock = redissonClient.getLock("lock:coupon_record_release:"+recordMessage.getTaskId());
        //lock.lock();
        try {
            if (flag) {
                //确认消息消费成功
                channel.basicAck(msgTag, false);
            }else {
                log.error("释放优惠券失败 flag=false,{}",recordMessage);
                channel.basicReject(msgTag,true);
            }

        } catch (IOException e) {
            log.error("释放优惠券记录异常:{},msg:{}",e,recordMessage);
            channel.basicReject(msgTag,true);
        }
//        finally {
//            lock.unlock();
//        }


    }


}

ProductOrderFeignSerivce、CouponRecordService、CouponRecordMapper
import net.xdclass.utils.JsonData;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient(name = "xdclass-order-service")

public interface ProductOrderFeignSerivce {


    /**
     * 查询订单状态
     * @param outTradeNo
     * @return
     */
    @GetMapping("/api/order/v1/query_state")
    JsonData queryProductOrderState(@RequestParam("out_trade_no")String outTradeNo);


}
    /**
     * 释放优惠券记录
     * @param recordMessage
     * @return
     */
    boolean releaseCouponRecord(CouponRecordMessage recordMessage);
    /**
     * 更新优惠券使用记录
     * @param couponRecordId
     * @param state
     */
    void updateState(@Param("couponRecordId") Long couponRecordId, @Param("state") String state);
重中之重的实现类CouponRecordServiceImpl
 /**
     * 解锁优惠券记录
     * 1)查询task工作单是否存在
     * 2) 查询订单状态
     * @param recordMessage
     * @return
     */
    @Override
    @Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
    public boolean releaseCouponRecord(CouponRecordMessage recordMessage) {

        //查询下task是否存
        CouponTaskDO taskDO = couponTaskMapper.selectOne(new QueryWrapper<CouponTaskDO>().eq("id",recordMessage.getTaskId()));

        if(taskDO==null){
            log.warn("工作单不存在,消息:{}",recordMessage);
            return true;
        }

        //lock状态才处理
        if(taskDO.getLockState().equalsIgnoreCase(StockTaskStateEnum.LOCK.name())){

            //查询订单状态
            JsonData jsonData = orderFeignSerivce.queryProductOrderState(recordMessage.getOutTradeNo());
            if(jsonData.getCode()==0){
                //正常响应,判断订单状态
                String state = jsonData.getData().toString();
                if(ProductOrderStateEnum.NEW.name().equalsIgnoreCase(state)){
                    //状态是NEW新建状态,则返回给消息队,列重新投递
                    log.warn("订单状态是NEW,返回给消息队列,重新投递:{}",recordMessage);
                    return false;
                }

                //如果是已经支付
                if(ProductOrderStateEnum.PAY.name().equalsIgnoreCase(state)){
                    //如果已经支付,修改task状态为finish
                    taskDO.setLockState(StockTaskStateEnum.FINISH.name());
                    couponTaskMapper.update(taskDO,new QueryWrapper<CouponTaskDO>().eq("id",recordMessage.getTaskId()));
                    log.info("订单已经支付,修改库存锁定工作单FINISH状态:{}",recordMessage);
                    return true;
                }
            }

            //订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复优惠券使用记录为NEW
            log.warn("订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复优惠券使用记录为NEW,message:{}",recordMessage);
            taskDO.setLockState(StockTaskStateEnum.CANCEL.name());

            couponTaskMapper.update(taskDO,new QueryWrapper<CouponTaskDO>().eq("id",recordMessage.getTaskId()));
            //恢复优惠券记录是NEW状态
            couponRecordMapper.updateState(taskDO.getCouponRecordId(),CouponStateEnum.NEW.name());

            return true;
        }else {
            log.warn("工作单状态不是LOCK,state={},消息体={}",taskDO.getLockState(),recordMessage);
            return true;
        }
    }
一个简单的自定义数据库操作
    <!--更新优惠券状态-->
    <update id="updateState">

        update coupon_record set use_state = #{useState} where id = #{couponRecordId}

    </update>

发表评论 取消回复
表情 图片 链接 代码

分享