一、简单的使用MQ
1.搭建就不详细介绍了,网上有很多,这里推荐一种最简单的docker傻瓜式搭建
2.引入依赖
<!--引入AMQP--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.添加配置文件
##----------rabbit配置-------------- spring.rabbitmq.host=8.142.19.202 spring.rabbitmq.port=5672 #需要手工创建虚拟主机 spring.rabbitmq.virtual-host=dev spring.rabbitmq.username=admin spring.rabbitmq.password=666 #消息确认方式,manual(手动ack) 和auto(自动ack); 消息消费重试到达指定次数进到异常交换机和异常队列,需要改为自动ack确认消息 spring.rabbitmq.listener.simple.acknowledge-mode=auto
4.添加MQ配置类
RabbitMQConfig
点击查看完整内容
package com.example.test05.demo.config;
import lombok.Data;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author DBC
* @version 1.0.0
* @date 2023年01月01日 15:18:18
* @website dbc655.top
*/
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交换机
*/
private String dbcTestEventExchange="dbc.event.exchange";
/**
* 新增短链具体的routingKey,【发送消息使用】
*/
private String dbcTestAddRoutingKey="dbc.add.test.mapping.routing.key";
//相关配置====================================
/**
* 新增dbc测试 队列
*/
private String dbcTestAddLinkQueue="dbc.add.test.queue";
/**
* topic类型的binding key,用于绑定队列和交换机,是用于消费者
*/
private String dbcTestAddLinkBindingKey="dbc.add.test.*.routing.key";
/**
* 创建交换机 Topic类型
* 一般一个微服务一个交换机
* @return
*/
@Bean
public Exchange shortLinkEventExchange(){
// 开启持久化 true 是否自动删除 false
return new TopicExchange(dbcTestEventExchange,true,false);
}
/**
* 新增dbc测试队列和交换机的绑定关系建立
*/
@Bean
public Binding shortLinkAddApiBinding(){
return new Binding(dbcTestAddLinkQueue,Binding.DestinationType.QUEUE, dbcTestEventExchange,dbcTestAddLinkBindingKey,null);
}
/**
* 新增dbc测试 普通队列,用于被监听
*/
@Bean
public Queue shortLinkAddLinkQueue(){
return new Queue(dbcTestAddLinkQueue,true,false,false);
}
} 5.编写消费者
DbcTestAddLinkMQListener
点击查看完整内容
package com.example.test05.demo.listener;
import com.example.test05.demo.enums.BizCodeEnum;
import com.example.test05.demo.exception.BizException;
import com.example.test05.demo.model.UserDO;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import sun.plugin2.message.EventMessage;
import java.io.IOException;
/**
* @author DBC
* @version 1.0.0
* @date 2023年01月01日 15:53:34
* @website dbc655.top
*/
@Component
@Slf4j
@RabbitListener(queues = "dbc.add.test.queue")
public class DbcTestAddLinkMQListener {
@RabbitHandler
public void shortLinkHandler(UserDO eventMessage, Message message, Channel channel) throws IOException {
log.info("监听到消息DbcTestAddLinkMQListener message消息内容:{}",message);
long tag = message.getMessageProperties().getDeliveryTag();
try{
int i = 1/0;
//TODO 处理业务逻辑
}catch (Exception e){
//处理业务异常,还有进行其他操作,比如记录失败原因
log.error("消费失败:{}",eventMessage);
throw new BizException(BizCodeEnum.JV_SHUOMING);
}
log.info("消费成功:{}",eventMessage);
//确认消息消费成功
// channel.basicAck(tag,false);
}
}
示例图
这里我们看到还有一个Error,这我们后面会说到
6.编写生产者
@Override
public void addMQ() {
UserDO userDO = UserDO.builder()
.name("刘罗锅")
.time(new Date())
.build();
rabbitTemplate.convertAndSend(rabbitMQConfig.getDbcTestEventExchange(), rabbitMQConfig.getDbcTestAddRoutingKey(),userDO);
} 二、配置异常交换机
为什么要配置异常交换机?
1.定义异常配置
#开启重试,消费者代码不能添加try catch捕获不往外抛异常 spring.rabbitmq.listener.simple.retry.enabled=true #最大重试次数 spring.rabbitmq.listener.simple.retry.max-attempts=4 # 重试消息的时间间隔,5秒 spring.rabbitmq.listener.simple.retry.initial-interval=5000
2.定义错误交换机、队列配置类
RabbitMQErrorConfig
点击查看完整内容
package com.example.test05.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author DBC
* @date 2023/1/3 10:09
* @network dbc655.top
*/
@Configuration
@Slf4j
public class RabbitMQErrorConfig {
/**
* 异常交换机
*/
private String shortLinkErrorExchange = "dbc.error.exchange";
/**
* 异常队列
*/
private String shortLinkErrorQueue = "dbc.error.queue";
/**
* 异常routing.key
*/
private String shortLinkErrorRoutingKey = "dbc.error.routing.key";
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 创建异常交换机
* @return
*/
@Bean
public TopicExchange errorTopicExchange(){
return new TopicExchange(shortLinkErrorExchange,true,false);
}
/**
* 创建异常队列
* @return
*/
@Bean
public Queue errorQueue(){
return new Queue(shortLinkErrorQueue,true);
}
/**
* 建立绑定关系
* @return
*/
@Bean
public Binding bindingErrorQueueAndExchange(){
return BindingBuilder.bind(errorQueue()).to(errorTopicExchange()).with(shortLinkErrorRoutingKey);
}
/**
* 配置 RepublishMessageRecoverer
*
* 消费消息重试一定次数后,用特定的routingKey转发到指定的交换机中,方便后续排查和告警
*
* @return
*/
@Bean
public MessageRecoverer messageRecoverer(){
return new RepublishMessageRecoverer(rabbitTemplate,shortLinkErrorExchange,shortLinkErrorRoutingKey);
}
} 3.定义异常消费者
DbcTestErrorMQListener
package com.example.test05.demo.listener;
import com.example.test05.demo.model.UserDO;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author DBC
* @version 1.0.0
* @date 2023年01月01日 16:53:26
* @website dbc655.top
*/
@Component
@Slf4j
@RabbitListener(queuesToDeclare = { @Queue("dbc.error.queue") })
public class DbcTestErrorMQListener {
@RabbitHandler
public void shortLinkHandler(UserDO eventMessage, Message message, Channel channel) throws IOException {
log.error("告警:监听到消息DbcTestErrorMQListener eventMessage消息内容:{}",eventMessage);
log.error("告警:Message:{}",message);
log.error("告警成功,发送通知短信");
}
} 我们可以如下图中清晰的看到,当消息消费失败4次之后,这个错误消息我们也进行了消费了,好了,又完成一个小功能![aru_36]
点击查看完整内容
11:04:18.118 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue])
11:04:18.119 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023)
11:04:23.119 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue])
11:04:23.119 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023)
11:04:28.135 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue])
11:04:28.137 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023)
11:04:33.144 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue])
11:04:33.144 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023)
11:04:33.146 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] WARN o.s.a.r.r.RepublishMessageRecoverer - Republishing failed message to exchange 'dbc.error.exchange' with routing key dbc.error.routing.key
11:04:33.210 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] ERROR c.e.t.d.l.DbcTestErrorMQListener - 告警:监听到消息DbcTestErrorMQListener eventMessage消息内容:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023)
11:04:33.211 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] ERROR c.e.t.d.l.DbcTestErrorMQListener - 告警:Message:(Body:'[serialized object]' MessageProperties [headers={x-exception-message=举例说明, x-original-routingKey=dbc.add.test.mapping.routing.key, x-original-exchange=dbc.event.exchange, x-exception-stacktrace=org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.test05.demo.listener.DbcTestAddLinkMQListener.shortLinkHandler(com.example.test05.demo.model.UserDO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.io.IOException' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:252)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:194)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:93)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:116)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at org.springframework.amqp.rabbit.listener.$Proxy86.invokeListener(Unknown Source)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
at java.lang.Thread.run(Thread.java:748)
Caused by: BizException(code=666666, msg=举例说明)
at com.example.test05.demo.listener.DbcTestAddLinkMQListener.shortLinkHandler(DbcTestAddLinkMQListener.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:163)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:81)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:244)
... 27 more
}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.error.exchange, receivedRoutingKey=dbc.error.routing.key, deliveryTag=2, consumerTag=amq.ctag-Y9ZQTs1fx16GDYN_IgoOFA, consumerQueue=dbc.error.queue])
11:04:33.211 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] ERROR c.e.t.d.l.DbcTestErrorMQListener - 告警成功,发送通知短信




三、进阶操作 —— 优雅封装RabbitMQ的配置(动态)
关键代码实现
1、关键MQ配置类 —— RabbitMqConfig
点击查看完整 RabbitMqConfig 内容
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.StopWatch;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.woniu.connection.constants.RabbitEnum;
import com.woniu.connection.constants.RabbitExchangeTypeEnum;
import com.woniu.connection.consumer.ConsumerContainerFactory;
import com.woniu.connection.product.AbsProducerService;
import com.woniu.connection.property.ModuleProperties;
import com.woniu.connection.property.RabbitModuleProperties;
import com.woniu.connection.retry.CustomRetryListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定
* RabbitMQ 全局配置
*/
@Slf4j
@Configuration
public class RabbitMqConfig implements SmartInitializingSingleton {
/**
* MQ链接工厂
*/
private ConnectionFactory connectionFactory;
/**
* MQ操作管理器
*/
private AmqpAdmin amqpAdmin;
/**
* YML配置
*/
private RabbitModuleProperties rabbitModuleProperties;
@Autowired
public RabbitMqConfig(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties, ConnectionFactory connectionFactory) {
this.amqpAdmin = amqpAdmin;
this.rabbitModuleProperties = rabbitModuleProperties;
this.connectionFactory = connectionFactory;
}
@Override
public void afterSingletonsInstantiated() {
StopWatch stopWatch = StopWatch.create("MQ");
stopWatch.start();
log.debug("初始化MQ配置");
List<ModuleProperties> modules = rabbitModuleProperties.getModules();
if (CollUtil.isEmpty(modules)) {
log.warn("未配置MQ");
return;
}
for (ModuleProperties module : modules) {
try {
Queue queue = genQueue(module);
Exchange exchange = genQueueExchange(module);
queueBindExchange(queue, exchange, module);
bindProducer(module);
bindConsumer(queue, exchange, module);
} catch (Exception e) {
log.error("初始化失败", e);
}
}
stopWatch.stop();
log.info("初始化MQ配置成功耗时: {}ms", stopWatch.getTotal(TimeUnit.MILLISECONDS));
}
/**
* 绑定生产者
*
* @param module
*/
private void bindProducer(ModuleProperties module) {
try {
AbsProducerService producerService = SpringUtil.getBean(module.getProducer());
producerService.setExchange(module.getExchange().getName());
producerService.setRoutingKey(module.getRoutingKey());
log.debug("绑定生产者: {}", module.getProducer());
} catch (Exception e) {
log.warn("无法在容器中找到该生产者[{}],若需要此生产者则需要做具体实现", module.getConsumer());
}
}
/**
* 绑定消费者
*
* @param queue
* @param exchange
* @param module
*/
private void bindConsumer(Queue queue, Exchange exchange, ModuleProperties module) {
CustomRetryListener customRetryListener = null;
try {
customRetryListener = SpringUtil.getBean(module.getRetry());
} catch (Exception e) {
log.debug("无法在容器中找到该重试类[{}],若需要重试则需要做具体实现", module.getRetry());
}
try {
ConsumerContainerFactory factory = ConsumerContainerFactory.builder()
.connectionFactory(connectionFactory)
.queue(queue)
.exchange(exchange)
.consumer(SpringUtil.getBean(module.getConsumer()))
.retryListener(customRetryListener)
.autoAck(module.getAutoAck())
.amqpAdmin(amqpAdmin).build();
SimpleMessageListenerContainer container = factory.getObject();
if (Objects.nonNull(container)) {
container.start();
}
log.debug("绑定消费者: {}", module.getConsumer());
} catch (Exception e) {
log.warn("无法在容器中找到该消费者[{}],若需要此消费者则需要做具体实现", module.getConsumer());
}
}
/**
* 队列绑定交换机
*
* @param queue
* @param exchange
* @param module
*/
private void queueBindExchange(Queue queue, Exchange exchange, ModuleProperties module) {
log.debug("初始化交换机: {}", module.getExchange().getName());
String queueName = module.getQueue().getName();
String exchangeName = module.getExchange().getName();
module.setRoutingKey(StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), module.getRoutingKey()));
String routingKey = module.getRoutingKey();
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
amqpAdmin.declareQueue(queue);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareBinding(binding);
log.debug("队列绑定交换机: 队列: {}, 交换机: {}", queueName, exchangeName);
}
/**
* 创建交换机
*
* @param module
* @return
*/
private Exchange genQueueExchange(ModuleProperties module) {
ModuleProperties.Exchange exchange = module.getExchange();
RabbitExchangeTypeEnum exchangeType = exchange.getType();
exchange.setName(StrUtil.format(RabbitEnum.EXCHANGE.getValue(), exchange.getName()));
String exchangeName = exchange.getName();
Boolean isDurable = exchange.isDurable();
Boolean isAutoDelete = exchange.isAutoDelete();
Map<String, Object> arguments = exchange.getArguments();
return getExchangeByType(exchangeType, exchangeName, isDurable, isAutoDelete, arguments);
}
/**
* 根据类型生成交换机
*
* @param exchangeType
* @param exchangeName
* @param isDurable
* @param isAutoDelete
* @param arguments
* @return
*/
private Exchange getExchangeByType(RabbitExchangeTypeEnum exchangeType, String exchangeName, Boolean isDurable, Boolean isAutoDelete, Map<String, Object> arguments) {
AbstractExchange exchange = null;
switch (exchangeType) {
// 直连交换机
case DIRECT:
exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
// 主题交换机
case TOPIC:
exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
//扇形交换机
case FANOUT:
exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
// 头交换机
case HEADERS:
exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
default:
log.warn("未匹配到交换机类型");
break;
}
return exchange;
}
/**
* 创建队列
*
* @param module
* @return
*/
private Queue genQueue(ModuleProperties module) {
ModuleProperties.Queue queue = module.getQueue();
queue.setName(StrUtil.format(RabbitEnum.QUEUE.getValue(), queue.getName()));
log.debug("初始化队列: {}", queue.getName());
Map<String, Object> arguments = queue.getArguments();
if (MapUtil.isEmpty(arguments)) {
arguments = new HashMap<>();
}
// 转换ttl的类型为long
if (arguments.containsKey("x-message-ttl")) {
arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
}
// 绑定死信队列
String deadLetterExchange = queue.getDeadLetterExchange();
String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {
deadLetterExchange = StrUtil.format(RabbitEnum.EXCHANGE.getValue(), deadLetterExchange);
deadLetterRoutingKey = StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), deadLetterRoutingKey);
arguments.put("x-dead-letter-exchange", deadLetterExchange);
arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
log.debug("绑定死信队列: 交换机: {}, 路由: {}", deadLetterExchange, deadLetterRoutingKey);
}
return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
}
}
2、对应必要枚举 —— RabbitEnum、RabbitExchangeTypeEnum
点击查看完整 RabbitEnum 内容
import lombok.Getter;
/**
* 队列,交换机。路由 常量枚举
*/
public enum RabbitEnum {
QUEUE("xxx.{}.queue", "队列名称"),
EXCHANGE("xxx.{}.exchange", "交换机名称"),
ROUTER_KEY("xxx.{}.key", "路由名称"),
;
RabbitEnum(String value, String desc) {
this.value = value;
this.desc = desc;
}
@Getter
private String value;
@Getter
private String desc;
}
点击查看完整 RabbitExchangeTypeEnum 内容
/**
* 交换机类型枚举
*/
public enum RabbitExchangeTypeEnum {
/**
* 直连交换机
* <p>
* 根据routing-key精准匹配队列(最常使用)
*/
DIRECT,
/**
* 主题交换机
* <p>
* 根据routing-key模糊匹配队列,*匹配任意一个字符,#匹配0个或多个字符
*/
TOPIC,
/**
* 扇形交换机
* <p>
* 直接分发给所有绑定的队列,忽略routing-key,用于广播消息
*/
FANOUT,
/**
* 头交换机
* <p>
* 类似直连交换机,不同于直连交换机的路由规则建立在头属性上而不是routing-key(使用较少)
*/
HEADERS;
}
3、消费者相关配置类 —— AbsConsumerService、ConsumerContainerFactory
点击查看完整 AbsConsumerService 内容
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import java.io.IOException;
@Slf4j
public abstract class AbsConsumerService<T> implements ConsumerService {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String body = new String(message.getBody());
onConsumer(body, message, channel);
}
/**
* 扩展消费方法,对消息进行封装
*
* @param data
* @throws IOException
*/
public void onConsumer(String data, Message message, Channel channel) throws IOException {
log.error("未对此方法进行实现: {}", data);
}
/**
* 确认消息
*/
protected void ack(Message message, Channel channel) throws IOException {
ack(Boolean.FALSE, message, channel);
}
/**
* 拒绝消息
*/
protected void nack(Message message, Channel channel) throws IOException {
nack(Boolean.FALSE, Boolean.FALSE, message, channel);
}
/**
* 拒绝消息
*/
protected void basicReject(Message message, Channel channel) throws IOException {
basicReject(Boolean.FALSE, message, channel);
}
/**
* 拒绝消息
*
* @param multiple 当前 DeliveryTag 的消息是否确认所有 true 是, false 否
*/
protected void basicReject(Boolean multiple, Message message, Channel channel) throws IOException {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), multiple);
}
/**
* 是否自动确认
*
* @param multiple 当前 DeliveryTag 的消息是否确认所有 true 是, false 否
*/
protected void ack(Boolean multiple, Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), multiple);
}
/**
* 拒绝消息
*
* @param multiple 当前 DeliveryTag 的消息是否确认所有 true 是, false 否
* @param requeue 当前 DeliveryTag 消息是否重回队列 true 是 false 否
*/
protected void nack(Boolean multiple, Boolean requeue, Message message, Channel channel) throws IOException {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), multiple, requeue);
}
}
点击查看完整 ConsumerContainerFactory 内容
import com.woniu.connection.retry.CustomRetryListener;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.aopalliance.aop.Advice;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import java.util.Objects;
/**
* MQ具体消息监听器工厂
*/
@Data
@Slf4j
@Builder
public class ConsumerContainerFactory implements FactoryBean<SimpleMessageListenerContainer> {
/**
* MQ连接工厂
*/
private ConnectionFactory connectionFactory;
/**
* 操作MQ管理器
*/
private AmqpAdmin amqpAdmin;
/**
* 队列
*/
private Queue queue;
/**
* 交换机
*/
private Exchange exchange;
/**
* 消费者
*/
private ConsumerService consumer;
/**
* 重试回调
*/
private CustomRetryListener retryListener;
/**
* 最大重试次数
*/
private final Integer maxAttempts = 5;
/**
* 是否自动确认
*/
private Boolean autoAck;
@Override
public SimpleMessageListenerContainer getObject() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setAmqpAdmin(amqpAdmin);
container.setConnectionFactory(connectionFactory);
container.setQueues(queue);
container.setPrefetchCount(1);
container.setConcurrentConsumers(20);
container.setMaxConcurrentConsumers(100);
container.setDefaultRequeueRejected(Boolean.FALSE);
container.setAdviceChain(createRetry());
container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
if (Objects.nonNull(consumer)) {
container.setMessageListener(consumer);
}
return container;
}
/**
* 配置重试
*
* @return
*/
private Advice createRetry() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
// 第一次重试调用
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
if (Objects.nonNull(retryListener)) {
retryListener.onRetry(context, callback, throwable);
if (maxAttempts.equals(context.getRetryCount())) {
retryListener.lastRetry(context, callback, throwable);
}
}
}
});
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(maxAttempts));
retryTemplate.setBackOffPolicy(genExponentialBackOffPolicy());
return RetryInterceptorBuilder.stateless().retryOperations(retryTemplate).recoverer(new RejectAndDontRequeueRecoverer()).build();
}
/**
* 设置过期时间
*
* @return
*/
private BackOffPolicy genExponentialBackOffPolicy() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
// 重试间隔基数(秒)
backOffPolicy.setInitialInterval(5000);
// 从重试的第一次至最后一次,最大时间间隔(秒)
backOffPolicy.setMaxInterval(86400000);
// 重试指数
backOffPolicy.setMultiplier(1);
return backOffPolicy;
}
@Override
public Class<?> getObjectType() {
return SimpleMessageListenerContainer.class;
}
}
附送一个空的消费者接口 —— ConsumerService
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
/**
* 消费者接口
*/
public interface ConsumerService extends ChannelAwareMessageListener {
} 4、重试处理器
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
/**
* 重试处理器
*/
public interface CustomRetryListener {
/**
* 最后一次重试失败回调
* @param context
* @param callback
* @param throwable
* @param <E>
* @param <T>
*/
public <E extends Throwable, T> void lastRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable);
/**
* 每次失败的回调
* @param context
* @param callback
* @param throwable
* @param <E>
* @param <T>
*/
public <E extends Throwable, T> void onRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable);
}
5、生产者相关 —— AbsProducerService、ProducerService
点击查看完整 AbsProducerService 内容
import cn.hutool.core.util.IdUtil;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
/**
* 生产者实现类
*/
@Slf4j
public class AbsProducerService implements ProducerService {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 交换机
*/
private String exchange;
/**
* 路由
*/
private String routingKey;
@Override
public void send(Object msg) {
MessagePostProcessor messagePostProcessor = (message) -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setMessageId(IdUtil.randomUUID());
messageProperties.setTimestamp(new Date());
return message;
};
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentEncoding("UTF-8");
messageProperties.setContentType("text/plain");
String data = JSONUtil.toJsonStr(msg);
Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend(this.exchange, this.routingKey, message, messagePostProcessor);
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
}
ProducerService
/**
* 生产者接口
*/
public interface ProducerService {
/**
* 发送消息
* @param message
*/
void send(Object message);
}
6、配置类 —— ModuleProperties、RabbitModuleProperties
点击查看完整 ModuleProperties 内容
import com.woniu.connection.constants.RabbitExchangeTypeEnum;
import lombok.Data;
import java.util.Map;
/**
* YML配置类
*
*/
@Data
public class ModuleProperties {
/**
* 路由Key
*/
private String routingKey;
/**
* 生产者
*/
private String producer;
/**
* 消费者
*/
private String consumer;
/**
* 自动确认
*/
private Boolean autoAck = true;
/**
* 队列信息
*/
private Queue queue;
/**
* 交换机信息
*/
private Exchange exchange;
/**
* 重试器
*/
private String retry;
/**
* 交换机信息类
*/
@Data
public static class Exchange {
/**
* 交换机类型
* 默认主题交换机
*/
private RabbitExchangeTypeEnum type = RabbitExchangeTypeEnum.TOPIC;
/**
* 交换机名称
*/
private String name;
/**
* 是否持久化
* 默认true持久化,重启消息不会丢失
*/
private boolean durable = true;
/**
* 当所有队绑定列均不在使用时,是否自动删除交换机
* 默认false,不自动删除
*/
private boolean autoDelete = false;
/**
* 交换机其他参数
*/
private Map<String, Object> arguments;
}
/**
* 队列信息类
*/
@Data
public static class Queue {
/**
* 队列名称
*/
private String name;
/**
* 是否持久化
*/
private boolean durable = true; // 默认true持久化,重启消息不会丢失
/**
* 是否具有排他性
*/
private boolean exclusive = false; // 默认false,可多个消费者消费同一个队列
/**
* 当消费者均断开连接,是否自动删除队列
*/
private boolean autoDelete = false; // 默认false,不自动删除,避免消费者断开队列丢弃消息
/**
* 绑定死信队列的交换机名称
*/
private String deadLetterExchange;
/**
* 绑定死信队列的路由key
*/
private String deadLetterRoutingKey;
/**
* 交换机其他参数
*/
private Map<String, Object> arguments;
}
}
点击查看完整 RabbitModuleProperties 内容
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* 绑定配置基础类
*/
@Data
@Configuration
@ConfigurationProperties("spring.rabbitmq")
public class RabbitModuleProperties {
/**
* 模块配置
*/
List<ModuleProperties> modules;
}
7、总体结构一览如下
简单配置后使用
1、yml 配置文件
spring:
rabbitmq:
# 动态创建和绑定队列、交换机的配置
modules:
# 正常队列
- routing-key: test
consumer: testConsumerService
producer: testProducerService
autoAck: true
queue:
name: test
dead-letter-exchange: dead
dead-letter-routing-key: dead
arguments:
# 1分钟(测试),单位毫秒
x-message-ttl: 1000
exchange:
name: test
# 死信队列
- routing-key: dead
consumer: deadConsumerService
producer: deadProducerService
autoAck: true
queue:
name: dead
exchange:
name: dead
host: 你的ip
port: 端口
username: 账号
password: 你的密码
virtual-host: dev 2、简单的测试一个请求
请求入口
消费者开始消费
死信消费
3、具体的效果如下
本文作者为DBC,转载请注明。







