
CouponMQListener
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import net.xdclass.model.CouponRecordMessage;
import net.xdclass.service.CouponRecordService;
import org.checkerframework.checker.units.qual.A;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
@Slf4j
@Component
@RabbitListener(queues = "${mqconfig.coupon_release_queue}")
public class CouponMQListener {
@Autowired
private CouponRecordService couponRecordService;
@Autowired
private RedissonClient redissonClient;
/**
*
* 重复消费-幂等性
*
* 消费失败,重新入队后最大重试次数:
* 如果消费失败,不重新入队,可以记录日志,然后插到数据库人工排查
*
* 消费者这块还有啥问题,大家可以先想下,然后给出解决方案
*
* @param recordMessage
* @param message
* @param channel
* @throws IOException
*/
@RabbitHandler
public void releaseCouponRecord(CouponRecordMessage recordMessage, Message message, Channel channel) throws IOException {
log.info("监听到消息:releaseCouponRecord消息内容:{}", recordMessage);
long msgTag = message.getMessageProperties().getDeliveryTag();
boolean flag = couponRecordService.releaseCouponRecord(recordMessage);
//防止同个解锁任务并发进入;如果是串行消费不用加锁;加锁有利也有弊,看项目业务逻辑而定
//Lock lock = redissonClient.getLock("lock:coupon_record_release:"+recordMessage.getTaskId());
//lock.lock();
try {
if (flag) {
//确认消息消费成功
channel.basicAck(msgTag, false);
}else {
log.error("释放优惠券失败 flag=false,{}",recordMessage);
channel.basicReject(msgTag,true);
}
} catch (IOException e) {
log.error("释放优惠券记录异常:{},msg:{}",e,recordMessage);
channel.basicReject(msgTag,true);
}
// finally {
// lock.unlock();
// }
}
}
ProductOrderFeignSerivce、CouponRecordService、CouponRecordMapper
import net.xdclass.utils.JsonData;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name = "xdclass-order-service")
public interface ProductOrderFeignSerivce {
/**
* 查询订单状态
* @param outTradeNo
* @return
*/
@GetMapping("/api/order/v1/query_state")
JsonData queryProductOrderState(@RequestParam("out_trade_no")String outTradeNo);
}
/**
* 释放优惠券记录
* @param recordMessage
* @return
*/
boolean releaseCouponRecord(CouponRecordMessage recordMessage); /**
* 更新优惠券使用记录
* @param couponRecordId
* @param state
*/
void updateState(@Param("couponRecordId") Long couponRecordId, @Param("state") String state); 重中之重的实现类CouponRecordServiceImpl
/**
* 解锁优惠券记录
* 1)查询task工作单是否存在
* 2) 查询订单状态
* @param recordMessage
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
public boolean releaseCouponRecord(CouponRecordMessage recordMessage) {
//查询下task是否存
CouponTaskDO taskDO = couponTaskMapper.selectOne(new QueryWrapper<CouponTaskDO>().eq("id",recordMessage.getTaskId()));
if(taskDO==null){
log.warn("工作单不存在,消息:{}",recordMessage);
return true;
}
//lock状态才处理
if(taskDO.getLockState().equalsIgnoreCase(StockTaskStateEnum.LOCK.name())){
//查询订单状态
JsonData jsonData = orderFeignSerivce.queryProductOrderState(recordMessage.getOutTradeNo());
if(jsonData.getCode()==0){
//正常响应,判断订单状态
String state = jsonData.getData().toString();
if(ProductOrderStateEnum.NEW.name().equalsIgnoreCase(state)){
//状态是NEW新建状态,则返回给消息队,列重新投递
log.warn("订单状态是NEW,返回给消息队列,重新投递:{}",recordMessage);
return false;
}
//如果是已经支付
if(ProductOrderStateEnum.PAY.name().equalsIgnoreCase(state)){
//如果已经支付,修改task状态为finish
taskDO.setLockState(StockTaskStateEnum.FINISH.name());
couponTaskMapper.update(taskDO,new QueryWrapper<CouponTaskDO>().eq("id",recordMessage.getTaskId()));
log.info("订单已经支付,修改库存锁定工作单FINISH状态:{}",recordMessage);
return true;
}
}
//订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复优惠券使用记录为NEW
log.warn("订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复优惠券使用记录为NEW,message:{}",recordMessage);
taskDO.setLockState(StockTaskStateEnum.CANCEL.name());
couponTaskMapper.update(taskDO,new QueryWrapper<CouponTaskDO>().eq("id",recordMessage.getTaskId()));
//恢复优惠券记录是NEW状态
couponRecordMapper.updateState(taskDO.getCouponRecordId(),CouponStateEnum.NEW.name());
return true;
}else {
log.warn("工作单状态不是LOCK,state={},消息体={}",taskDO.getLockState(),recordMessage);
return true;
}
} 一个简单的自定义数据库操作
<!--更新优惠券状态-->
<update id="updateState">
update coupon_record set use_state = #{useState} where id = #{couponRecordId}
</update> 本文作者为DBC,转载请注明。