
配置文件
#自定义消息队列配置,发送锁定库存消息-》延迟exchange-》lock.queue-》死信exchange-》release.queue mqconfig: #延迟队列,不能被监听消费 coupon_release_delay_queue: coupon.release.delay.queue #延迟队列的消息过期后转发的队列 coupon_release_queue: coupon.release.queue #交换机 coupon_event_exchange: coupon.event.exchange #进入延迟队列的路由key coupon_release_delay_routing_key: coupon.release.delay.routing.key #消息过期,进入释放死信队列的key coupon_release_routing_key: coupon.release.routing.key #消息过期时间,毫秒,测试改为15秒 ttl: 15000
配置类
import lombok.Data;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交换机
*/
@Value("${mqconfig.coupon_event_exchange}")
private String eventExchange;
/**
* 第一个队列 延迟队列,
*/
@Value("${mqconfig.coupon_release_delay_queue}")
private String couponReleaseDelayQueue;
/**
* 第一个队列的路由key
* 进入队列的路由key
*/
@Value("${mqconfig.coupon_release_delay_routing_key}")
private String couponReleaseDelayRoutingKey;
/**
* 第二个队列,被监听恢复库存的队列
*/
@Value("${mqconfig.coupon_release_queue}")
private String couponReleaseQueue;
/**
* 第二个队列的路由key
*
* 即进入死信队列的路由key
*/
@Value("${mqconfig.coupon_release_routing_key}")
private String couponReleaseRoutingKey;
/**
* 过期时间
*/
@Value("${mqconfig.ttl}")
private Integer ttl;
/**
* 消息转换器
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 创建交换机 Topic类型,也可以用dirct路由
* 一般一个微服务一个交换机
* @return
*/
@Bean
public Exchange couponEventExchange(){
return new TopicExchange(eventExchange,true,false);
}
/**
* 延迟队列
*/
@Bean
public Queue couponReleaseDelayQueue(){
Map<String,Object> args = new HashMap<>(3);
args.put("x-message-ttl",ttl);
args.put("x-dead-letter-routing-key",couponReleaseRoutingKey);
args.put("x-dead-letter-exchange",eventExchange);
return new Queue(couponReleaseDelayQueue,true,false,false,args);
}
/**
* 死信队列,普通队列,用于被监听
*/
@Bean
public Queue couponReleaseQueue(){
return new Queue(couponReleaseQueue,true,false,false);
}
/**
* 第一个队列,即延迟队列的绑定关系建立
* @return
*/
@Bean
public Binding couponReleaseDelayBinding(){
return new Binding(couponReleaseDelayQueue,Binding.DestinationType.QUEUE,eventExchange,couponReleaseDelayRoutingKey,null);
}
/**
* 死信队列绑定关系建立
* @return
*/
@Bean
public Binding couponReleaseBinding(){
return new Binding(couponReleaseQueue,Binding.DestinationType.QUEUE,eventExchange,couponReleaseRoutingKey,null);
}
}
本文作者为DBC,转载请注明。