CouponMQListener
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import net.xdclass.model.CouponRecordMessage; import net.xdclass.service.CouponRecordService; import org.checkerframework.checker.units.qual.A; import org.redisson.api.RedissonClient; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.locks.Lock; @Slf4j @Component @RabbitListener(queues = "${mqconfig.coupon_release_queue}") public class CouponMQListener { @Autowired private CouponRecordService couponRecordService; @Autowired private RedissonClient redissonClient; /** * * 重复消费-幂等性 * * 消费失败,重新入队后最大重试次数: * 如果消费失败,不重新入队,可以记录日志,然后插到数据库人工排查 * * 消费者这块还有啥问题,大家可以先想下,然后给出解决方案 * * @param recordMessage * @param message * @param channel * @throws IOException */ @RabbitHandler public void releaseCouponRecord(CouponRecordMessage recordMessage, Message message, Channel channel) throws IOException { log.info("监听到消息:releaseCouponRecord消息内容:{}", recordMessage); long msgTag = message.getMessageProperties().getDeliveryTag(); boolean flag = couponRecordService.releaseCouponRecord(recordMessage); //防止同个解锁任务并发进入;如果是串行消费不用加锁;加锁有利也有弊,看项目业务逻辑而定 //Lock lock = redissonClient.getLock("lock:coupon_record_release:"+recordMessage.getTaskId()); //lock.lock(); try { if (flag) { //确认消息消费成功 channel.basicAck(msgTag, false); }else { log.error("释放优惠券失败 flag=false,{}",recordMessage); channel.basicReject(msgTag,true); } } catch (IOException e) { log.error("释放优惠券记录异常:{},msg:{}",e,recordMessage); channel.basicReject(msgTag,true); } // finally { // lock.unlock(); // } } }
ProductOrderFeignSerivce、CouponRecordService、CouponRecordMapper
import net.xdclass.utils.JsonData; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; @FeignClient(name = "xdclass-order-service") public interface ProductOrderFeignSerivce { /** * 查询订单状态 * @param outTradeNo * @return */ @GetMapping("/api/order/v1/query_state") JsonData queryProductOrderState(@RequestParam("out_trade_no")String outTradeNo); }
/** * 释放优惠券记录 * @param recordMessage * @return */ boolean releaseCouponRecord(CouponRecordMessage recordMessage);
/** * 更新优惠券使用记录 * @param couponRecordId * @param state */ void updateState(@Param("couponRecordId") Long couponRecordId, @Param("state") String state);
重中之重的实现类CouponRecordServiceImpl
/** * 解锁优惠券记录 * 1)查询task工作单是否存在 * 2) 查询订单状态 * @param recordMessage * @return */ @Override @Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED) public boolean releaseCouponRecord(CouponRecordMessage recordMessage) { //查询下task是否存 CouponTaskDO taskDO = couponTaskMapper.selectOne(new QueryWrapper<CouponTaskDO>().eq("id",recordMessage.getTaskId())); if(taskDO==null){ log.warn("工作单不存在,消息:{}",recordMessage); return true; } //lock状态才处理 if(taskDO.getLockState().equalsIgnoreCase(StockTaskStateEnum.LOCK.name())){ //查询订单状态 JsonData jsonData = orderFeignSerivce.queryProductOrderState(recordMessage.getOutTradeNo()); if(jsonData.getCode()==0){ //正常响应,判断订单状态 String state = jsonData.getData().toString(); if(ProductOrderStateEnum.NEW.name().equalsIgnoreCase(state)){ //状态是NEW新建状态,则返回给消息队,列重新投递 log.warn("订单状态是NEW,返回给消息队列,重新投递:{}",recordMessage); return false; } //如果是已经支付 if(ProductOrderStateEnum.PAY.name().equalsIgnoreCase(state)){ //如果已经支付,修改task状态为finish taskDO.setLockState(StockTaskStateEnum.FINISH.name()); couponTaskMapper.update(taskDO,new QueryWrapper<CouponTaskDO>().eq("id",recordMessage.getTaskId())); log.info("订单已经支付,修改库存锁定工作单FINISH状态:{}",recordMessage); return true; } } //订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复优惠券使用记录为NEW log.warn("订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复优惠券使用记录为NEW,message:{}",recordMessage); taskDO.setLockState(StockTaskStateEnum.CANCEL.name()); couponTaskMapper.update(taskDO,new QueryWrapper<CouponTaskDO>().eq("id",recordMessage.getTaskId())); //恢复优惠券记录是NEW状态 couponRecordMapper.updateState(taskDO.getCouponRecordId(),CouponStateEnum.NEW.name()); return true; }else { log.warn("工作单状态不是LOCK,state={},消息体={}",taskDO.getLockState(),recordMessage); return true; } }
一个简单的自定义数据库操作
<!--更新优惠券状态--> <update id="updateState"> update coupon_record set use_state = #{useState} where id = #{couponRecordId} </update>
本文作者为DBC,转载请注明。