一、简单的使用MQ
1.搭建就不详细介绍了,网上有很多,这里推荐一种最简单的docker傻瓜式搭建
2.引入依赖
<!--引入AMQP--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.添加配置文件
##----------rabbit配置-------------- spring.rabbitmq.host=8.142.19.202 spring.rabbitmq.port=5672 #需要手工创建虚拟主机 spring.rabbitmq.virtual-host=dev spring.rabbitmq.username=admin spring.rabbitmq.password=666 #消息确认方式,manual(手动ack) 和auto(自动ack); 消息消费重试到达指定次数进到异常交换机和异常队列,需要改为自动ack确认消息 spring.rabbitmq.listener.simple.acknowledge-mode=auto
4.添加MQ配置类
RabbitMQConfig
点击查看完整内容
package com.example.test05.demo.config; import lombok.Data; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author DBC * @version 1.0.0 * @date 2023年01月01日 15:18:18 * @website dbc655.top */ @Configuration @Data public class RabbitMQConfig { /** * 交换机 */ private String dbcTestEventExchange="dbc.event.exchange"; /** * 新增短链具体的routingKey,【发送消息使用】 */ private String dbcTestAddRoutingKey="dbc.add.test.mapping.routing.key"; //相关配置==================================== /** * 新增dbc测试 队列 */ private String dbcTestAddLinkQueue="dbc.add.test.queue"; /** * topic类型的binding key,用于绑定队列和交换机,是用于消费者 */ private String dbcTestAddLinkBindingKey="dbc.add.test.*.routing.key"; /** * 创建交换机 Topic类型 * 一般一个微服务一个交换机 * @return */ @Bean public Exchange shortLinkEventExchange(){ // 开启持久化 true 是否自动删除 false return new TopicExchange(dbcTestEventExchange,true,false); } /** * 新增dbc测试队列和交换机的绑定关系建立 */ @Bean public Binding shortLinkAddApiBinding(){ return new Binding(dbcTestAddLinkQueue,Binding.DestinationType.QUEUE, dbcTestEventExchange,dbcTestAddLinkBindingKey,null); } /** * 新增dbc测试 普通队列,用于被监听 */ @Bean public Queue shortLinkAddLinkQueue(){ return new Queue(dbcTestAddLinkQueue,true,false,false); } }
5.编写消费者
DbcTestAddLinkMQListener
点击查看完整内容
package com.example.test05.demo.listener; import com.example.test05.demo.enums.BizCodeEnum; import com.example.test05.demo.exception.BizException; import com.example.test05.demo.model.UserDO; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import sun.plugin2.message.EventMessage; import java.io.IOException; /** * @author DBC * @version 1.0.0 * @date 2023年01月01日 15:53:34 * @website dbc655.top */ @Component @Slf4j @RabbitListener(queues = "dbc.add.test.queue") public class DbcTestAddLinkMQListener { @RabbitHandler public void shortLinkHandler(UserDO eventMessage, Message message, Channel channel) throws IOException { log.info("监听到消息DbcTestAddLinkMQListener message消息内容:{}",message); long tag = message.getMessageProperties().getDeliveryTag(); try{ int i = 1/0; //TODO 处理业务逻辑 }catch (Exception e){ //处理业务异常,还有进行其他操作,比如记录失败原因 log.error("消费失败:{}",eventMessage); throw new BizException(BizCodeEnum.JV_SHUOMING); } log.info("消费成功:{}",eventMessage); //确认消息消费成功 // channel.basicAck(tag,false); } }
示例图
这里我们看到还有一个Error
,这我们后面会说到
6.编写生产者
@Override public void addMQ() { UserDO userDO = UserDO.builder() .name("刘罗锅") .time(new Date()) .build(); rabbitTemplate.convertAndSend(rabbitMQConfig.getDbcTestEventExchange(), rabbitMQConfig.getDbcTestAddRoutingKey(),userDO); }
二、配置异常交换机
为什么要配置异常交换机?
1.定义异常配置
#开启重试,消费者代码不能添加try catch捕获不往外抛异常 spring.rabbitmq.listener.simple.retry.enabled=true #最大重试次数 spring.rabbitmq.listener.simple.retry.max-attempts=4 # 重试消息的时间间隔,5秒 spring.rabbitmq.listener.simple.retry.initial-interval=5000
2.定义错误交换机、队列配置类
RabbitMQErrorConfig
点击查看完整内容
package com.example.test05.demo.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author DBC * @date 2023/1/3 10:09 * @network dbc655.top */ @Configuration @Slf4j public class RabbitMQErrorConfig { /** * 异常交换机 */ private String shortLinkErrorExchange = "dbc.error.exchange"; /** * 异常队列 */ private String shortLinkErrorQueue = "dbc.error.queue"; /** * 异常routing.key */ private String shortLinkErrorRoutingKey = "dbc.error.routing.key"; @Autowired private RabbitTemplate rabbitTemplate; /** * 创建异常交换机 * @return */ @Bean public TopicExchange errorTopicExchange(){ return new TopicExchange(shortLinkErrorExchange,true,false); } /** * 创建异常队列 * @return */ @Bean public Queue errorQueue(){ return new Queue(shortLinkErrorQueue,true); } /** * 建立绑定关系 * @return */ @Bean public Binding bindingErrorQueueAndExchange(){ return BindingBuilder.bind(errorQueue()).to(errorTopicExchange()).with(shortLinkErrorRoutingKey); } /** * 配置 RepublishMessageRecoverer * * 消费消息重试一定次数后,用特定的routingKey转发到指定的交换机中,方便后续排查和告警 * * @return */ @Bean public MessageRecoverer messageRecoverer(){ return new RepublishMessageRecoverer(rabbitTemplate,shortLinkErrorExchange,shortLinkErrorRoutingKey); } }
3.定义异常消费者
DbcTestErrorMQListener
package com.example.test05.demo.listener; import com.example.test05.demo.model.UserDO; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author DBC * @version 1.0.0 * @date 2023年01月01日 16:53:26 * @website dbc655.top */ @Component @Slf4j @RabbitListener(queuesToDeclare = { @Queue("dbc.error.queue") }) public class DbcTestErrorMQListener { @RabbitHandler public void shortLinkHandler(UserDO eventMessage, Message message, Channel channel) throws IOException { log.error("告警:监听到消息DbcTestErrorMQListener eventMessage消息内容:{}",eventMessage); log.error("告警:Message:{}",message); log.error("告警成功,发送通知短信"); } }
我们可以如下图中清晰的看到,当消息消费失败4次之后,这个错误消息我们也进行了消费了,好了,又完成一个小功能![aru_36]
点击查看完整内容
11:04:18.118 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue]) 11:04:18.119 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023) 11:04:23.119 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue]) 11:04:23.119 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023) 11:04:28.135 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue]) 11:04:28.137 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023) 11:04:33.144 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue]) 11:04:33.144 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023) 11:04:33.146 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] WARN o.s.a.r.r.RepublishMessageRecoverer - Republishing failed message to exchange 'dbc.error.exchange' with routing key dbc.error.routing.key 11:04:33.210 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] ERROR c.e.t.d.l.DbcTestErrorMQListener - 告警:监听到消息DbcTestErrorMQListener eventMessage消息内容:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023) 11:04:33.211 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] ERROR c.e.t.d.l.DbcTestErrorMQListener - 告警:Message:(Body:'[serialized object]' MessageProperties [headers={x-exception-message=举例说明, x-original-routingKey=dbc.add.test.mapping.routing.key, x-original-exchange=dbc.event.exchange, x-exception-stacktrace=org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.test05.demo.listener.DbcTestAddLinkMQListener.shortLinkHandler(com.example.test05.demo.model.UserDO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.io.IOException' threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:252) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:194) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:93) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:116) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) at org.springframework.amqp.rabbit.listener.$Proxy86.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) at java.lang.Thread.run(Thread.java:748) Caused by: BizException(code=666666, msg=举例说明) at com.example.test05.demo.listener.DbcTestAddLinkMQListener.shortLinkHandler(DbcTestAddLinkMQListener.java:36) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:163) at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:81) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:244) ... 27 more }, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.error.exchange, receivedRoutingKey=dbc.error.routing.key, deliveryTag=2, consumerTag=amq.ctag-Y9ZQTs1fx16GDYN_IgoOFA, consumerQueue=dbc.error.queue]) 11:04:33.211 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] ERROR c.e.t.d.l.DbcTestErrorMQListener - 告警成功,发送通知短信
三、进阶操作 —— 优雅封装RabbitMQ的配置(动态)
关键代码实现
1、关键MQ配置类 —— RabbitMqConfig
点击查看完整 RabbitMqConfig 内容
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.StopWatch; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import com.woniu.connection.constants.RabbitEnum; import com.woniu.connection.constants.RabbitExchangeTypeEnum; import com.woniu.connection.consumer.ConsumerContainerFactory; import com.woniu.connection.product.AbsProducerService; import com.woniu.connection.property.ModuleProperties; import com.woniu.connection.property.RabbitModuleProperties; import com.woniu.connection.retry.CustomRetryListener; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; /** * 优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定 * RabbitMQ 全局配置 */ @Slf4j @Configuration public class RabbitMqConfig implements SmartInitializingSingleton { /** * MQ链接工厂 */ private ConnectionFactory connectionFactory; /** * MQ操作管理器 */ private AmqpAdmin amqpAdmin; /** * YML配置 */ private RabbitModuleProperties rabbitModuleProperties; @Autowired public RabbitMqConfig(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties, ConnectionFactory connectionFactory) { this.amqpAdmin = amqpAdmin; this.rabbitModuleProperties = rabbitModuleProperties; this.connectionFactory = connectionFactory; } @Override public void afterSingletonsInstantiated() { StopWatch stopWatch = StopWatch.create("MQ"); stopWatch.start(); log.debug("初始化MQ配置"); List<ModuleProperties> modules = rabbitModuleProperties.getModules(); if (CollUtil.isEmpty(modules)) { log.warn("未配置MQ"); return; } for (ModuleProperties module : modules) { try { Queue queue = genQueue(module); Exchange exchange = genQueueExchange(module); queueBindExchange(queue, exchange, module); bindProducer(module); bindConsumer(queue, exchange, module); } catch (Exception e) { log.error("初始化失败", e); } } stopWatch.stop(); log.info("初始化MQ配置成功耗时: {}ms", stopWatch.getTotal(TimeUnit.MILLISECONDS)); } /** * 绑定生产者 * * @param module */ private void bindProducer(ModuleProperties module) { try { AbsProducerService producerService = SpringUtil.getBean(module.getProducer()); producerService.setExchange(module.getExchange().getName()); producerService.setRoutingKey(module.getRoutingKey()); log.debug("绑定生产者: {}", module.getProducer()); } catch (Exception e) { log.warn("无法在容器中找到该生产者[{}],若需要此生产者则需要做具体实现", module.getConsumer()); } } /** * 绑定消费者 * * @param queue * @param exchange * @param module */ private void bindConsumer(Queue queue, Exchange exchange, ModuleProperties module) { CustomRetryListener customRetryListener = null; try { customRetryListener = SpringUtil.getBean(module.getRetry()); } catch (Exception e) { log.debug("无法在容器中找到该重试类[{}],若需要重试则需要做具体实现", module.getRetry()); } try { ConsumerContainerFactory factory = ConsumerContainerFactory.builder() .connectionFactory(connectionFactory) .queue(queue) .exchange(exchange) .consumer(SpringUtil.getBean(module.getConsumer())) .retryListener(customRetryListener) .autoAck(module.getAutoAck()) .amqpAdmin(amqpAdmin).build(); SimpleMessageListenerContainer container = factory.getObject(); if (Objects.nonNull(container)) { container.start(); } log.debug("绑定消费者: {}", module.getConsumer()); } catch (Exception e) { log.warn("无法在容器中找到该消费者[{}],若需要此消费者则需要做具体实现", module.getConsumer()); } } /** * 队列绑定交换机 * * @param queue * @param exchange * @param module */ private void queueBindExchange(Queue queue, Exchange exchange, ModuleProperties module) { log.debug("初始化交换机: {}", module.getExchange().getName()); String queueName = module.getQueue().getName(); String exchangeName = module.getExchange().getName(); module.setRoutingKey(StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), module.getRoutingKey())); String routingKey = module.getRoutingKey(); Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null); amqpAdmin.declareQueue(queue); amqpAdmin.declareExchange(exchange); amqpAdmin.declareBinding(binding); log.debug("队列绑定交换机: 队列: {}, 交换机: {}", queueName, exchangeName); } /** * 创建交换机 * * @param module * @return */ private Exchange genQueueExchange(ModuleProperties module) { ModuleProperties.Exchange exchange = module.getExchange(); RabbitExchangeTypeEnum exchangeType = exchange.getType(); exchange.setName(StrUtil.format(RabbitEnum.EXCHANGE.getValue(), exchange.getName())); String exchangeName = exchange.getName(); Boolean isDurable = exchange.isDurable(); Boolean isAutoDelete = exchange.isAutoDelete(); Map<String, Object> arguments = exchange.getArguments(); return getExchangeByType(exchangeType, exchangeName, isDurable, isAutoDelete, arguments); } /** * 根据类型生成交换机 * * @param exchangeType * @param exchangeName * @param isDurable * @param isAutoDelete * @param arguments * @return */ private Exchange getExchangeByType(RabbitExchangeTypeEnum exchangeType, String exchangeName, Boolean isDurable, Boolean isAutoDelete, Map<String, Object> arguments) { AbstractExchange exchange = null; switch (exchangeType) { // 直连交换机 case DIRECT: exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments); break; // 主题交换机 case TOPIC: exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments); break; //扇形交换机 case FANOUT: exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments); break; // 头交换机 case HEADERS: exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments); break; default: log.warn("未匹配到交换机类型"); break; } return exchange; } /** * 创建队列 * * @param module * @return */ private Queue genQueue(ModuleProperties module) { ModuleProperties.Queue queue = module.getQueue(); queue.setName(StrUtil.format(RabbitEnum.QUEUE.getValue(), queue.getName())); log.debug("初始化队列: {}", queue.getName()); Map<String, Object> arguments = queue.getArguments(); if (MapUtil.isEmpty(arguments)) { arguments = new HashMap<>(); } // 转换ttl的类型为long if (arguments.containsKey("x-message-ttl")) { arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl"))); } // 绑定死信队列 String deadLetterExchange = queue.getDeadLetterExchange(); String deadLetterRoutingKey = queue.getDeadLetterRoutingKey(); if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) { deadLetterExchange = StrUtil.format(RabbitEnum.EXCHANGE.getValue(), deadLetterExchange); deadLetterRoutingKey = StrUtil.format(RabbitEnum.ROUTER_KEY.getValue(), deadLetterRoutingKey); arguments.put("x-dead-letter-exchange", deadLetterExchange); arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey); log.debug("绑定死信队列: 交换机: {}, 路由: {}", deadLetterExchange, deadLetterRoutingKey); } return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments); } }
2、对应必要枚举 —— RabbitEnum、RabbitExchangeTypeEnum
点击查看完整 RabbitEnum 内容
import lombok.Getter; /** * 队列,交换机。路由 常量枚举 */ public enum RabbitEnum { QUEUE("xxx.{}.queue", "队列名称"), EXCHANGE("xxx.{}.exchange", "交换机名称"), ROUTER_KEY("xxx.{}.key", "路由名称"), ; RabbitEnum(String value, String desc) { this.value = value; this.desc = desc; } @Getter private String value; @Getter private String desc; }
点击查看完整 RabbitExchangeTypeEnum 内容
/** * 交换机类型枚举 */ public enum RabbitExchangeTypeEnum { /** * 直连交换机 * <p> * 根据routing-key精准匹配队列(最常使用) */ DIRECT, /** * 主题交换机 * <p> * 根据routing-key模糊匹配队列,*匹配任意一个字符,#匹配0个或多个字符 */ TOPIC, /** * 扇形交换机 * <p> * 直接分发给所有绑定的队列,忽略routing-key,用于广播消息 */ FANOUT, /** * 头交换机 * <p> * 类似直连交换机,不同于直连交换机的路由规则建立在头属性上而不是routing-key(使用较少) */ HEADERS; }
3、消费者相关配置类 —— AbsConsumerService、ConsumerContainerFactory
点击查看完整 AbsConsumerService 内容
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import java.io.IOException; @Slf4j public abstract class AbsConsumerService<T> implements ConsumerService { @Override public void onMessage(Message message, Channel channel) throws Exception { String body = new String(message.getBody()); onConsumer(body, message, channel); } /** * 扩展消费方法,对消息进行封装 * * @param data * @throws IOException */ public void onConsumer(String data, Message message, Channel channel) throws IOException { log.error("未对此方法进行实现: {}", data); } /** * 确认消息 */ protected void ack(Message message, Channel channel) throws IOException { ack(Boolean.FALSE, message, channel); } /** * 拒绝消息 */ protected void nack(Message message, Channel channel) throws IOException { nack(Boolean.FALSE, Boolean.FALSE, message, channel); } /** * 拒绝消息 */ protected void basicReject(Message message, Channel channel) throws IOException { basicReject(Boolean.FALSE, message, channel); } /** * 拒绝消息 * * @param multiple 当前 DeliveryTag 的消息是否确认所有 true 是, false 否 */ protected void basicReject(Boolean multiple, Message message, Channel channel) throws IOException { channel.basicReject(message.getMessageProperties().getDeliveryTag(), multiple); } /** * 是否自动确认 * * @param multiple 当前 DeliveryTag 的消息是否确认所有 true 是, false 否 */ protected void ack(Boolean multiple, Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), multiple); } /** * 拒绝消息 * * @param multiple 当前 DeliveryTag 的消息是否确认所有 true 是, false 否 * @param requeue 当前 DeliveryTag 消息是否重回队列 true 是 false 否 */ protected void nack(Boolean multiple, Boolean requeue, Message message, Channel channel) throws IOException { channel.basicNack(message.getMessageProperties().getDeliveryTag(), multiple, requeue); } }
点击查看完整 ConsumerContainerFactory 内容
import com.woniu.connection.retry.CustomRetryListener; import lombok.Builder; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.aopalliance.aop.Advice; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; import org.springframework.beans.factory.FactoryBean; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.RetryListener; import org.springframework.retry.backoff.BackOffPolicy; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; import java.util.Objects; /** * MQ具体消息监听器工厂 */ @Data @Slf4j @Builder public class ConsumerContainerFactory implements FactoryBean<SimpleMessageListenerContainer> { /** * MQ连接工厂 */ private ConnectionFactory connectionFactory; /** * 操作MQ管理器 */ private AmqpAdmin amqpAdmin; /** * 队列 */ private Queue queue; /** * 交换机 */ private Exchange exchange; /** * 消费者 */ private ConsumerService consumer; /** * 重试回调 */ private CustomRetryListener retryListener; /** * 最大重试次数 */ private final Integer maxAttempts = 5; /** * 是否自动确认 */ private Boolean autoAck; @Override public SimpleMessageListenerContainer getObject() throws Exception { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setAmqpAdmin(amqpAdmin); container.setConnectionFactory(connectionFactory); container.setQueues(queue); container.setPrefetchCount(1); container.setConcurrentConsumers(20); container.setMaxConcurrentConsumers(100); container.setDefaultRequeueRejected(Boolean.FALSE); container.setAdviceChain(createRetry()); container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL); if (Objects.nonNull(consumer)) { container.setMessageListener(consumer); } return container; } /** * 配置重试 * * @return */ private Advice createRetry() { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.registerListener(new RetryListener() { @Override public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) { // 第一次重试调用 return true; } @Override public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { } @Override public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) { if (Objects.nonNull(retryListener)) { retryListener.onRetry(context, callback, throwable); if (maxAttempts.equals(context.getRetryCount())) { retryListener.lastRetry(context, callback, throwable); } } } }); retryTemplate.setRetryPolicy(new SimpleRetryPolicy(maxAttempts)); retryTemplate.setBackOffPolicy(genExponentialBackOffPolicy()); return RetryInterceptorBuilder.stateless().retryOperations(retryTemplate).recoverer(new RejectAndDontRequeueRecoverer()).build(); } /** * 设置过期时间 * * @return */ private BackOffPolicy genExponentialBackOffPolicy() { ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); // 重试间隔基数(秒) backOffPolicy.setInitialInterval(5000); // 从重试的第一次至最后一次,最大时间间隔(秒) backOffPolicy.setMaxInterval(86400000); // 重试指数 backOffPolicy.setMultiplier(1); return backOffPolicy; } @Override public Class<?> getObjectType() { return SimpleMessageListenerContainer.class; } }
附送一个空的消费者接口 —— ConsumerService
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; /** * 消费者接口 */ public interface ConsumerService extends ChannelAwareMessageListener { }
4、重试处理器
import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; /** * 重试处理器 */ public interface CustomRetryListener { /** * 最后一次重试失败回调 * @param context * @param callback * @param throwable * @param <E> * @param <T> */ public <E extends Throwable, T> void lastRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable); /** * 每次失败的回调 * @param context * @param callback * @param throwable * @param <E> * @param <T> */ public <E extends Throwable, T> void onRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable); }
5、生产者相关 —— AbsProducerService、ProducerService
点击查看完整 AbsProducerService 内容
import cn.hutool.core.util.IdUtil; import cn.hutool.json.JSONUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.Date; /** * 生产者实现类 */ @Slf4j public class AbsProducerService implements ProducerService { @Resource private RabbitTemplate rabbitTemplate; /** * 交换机 */ private String exchange; /** * 路由 */ private String routingKey; @Override public void send(Object msg) { MessagePostProcessor messagePostProcessor = (message) -> { MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setMessageId(IdUtil.randomUUID()); messageProperties.setTimestamp(new Date()); return message; }; MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentEncoding("UTF-8"); messageProperties.setContentType("text/plain"); String data = JSONUtil.toJsonStr(msg); Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.convertAndSend(this.exchange, this.routingKey, message, messagePostProcessor); } public void setExchange(String exchange) { this.exchange = exchange; } public void setRoutingKey(String routingKey) { this.routingKey = routingKey; } }
ProducerService
/** * 生产者接口 */ public interface ProducerService { /** * 发送消息 * @param message */ void send(Object message); }
6、配置类 —— ModuleProperties、RabbitModuleProperties
点击查看完整 ModuleProperties 内容
import com.woniu.connection.constants.RabbitExchangeTypeEnum; import lombok.Data; import java.util.Map; /** * YML配置类 * */ @Data public class ModuleProperties { /** * 路由Key */ private String routingKey; /** * 生产者 */ private String producer; /** * 消费者 */ private String consumer; /** * 自动确认 */ private Boolean autoAck = true; /** * 队列信息 */ private Queue queue; /** * 交换机信息 */ private Exchange exchange; /** * 重试器 */ private String retry; /** * 交换机信息类 */ @Data public static class Exchange { /** * 交换机类型 * 默认主题交换机 */ private RabbitExchangeTypeEnum type = RabbitExchangeTypeEnum.TOPIC; /** * 交换机名称 */ private String name; /** * 是否持久化 * 默认true持久化,重启消息不会丢失 */ private boolean durable = true; /** * 当所有队绑定列均不在使用时,是否自动删除交换机 * 默认false,不自动删除 */ private boolean autoDelete = false; /** * 交换机其他参数 */ private Map<String, Object> arguments; } /** * 队列信息类 */ @Data public static class Queue { /** * 队列名称 */ private String name; /** * 是否持久化 */ private boolean durable = true; // 默认true持久化,重启消息不会丢失 /** * 是否具有排他性 */ private boolean exclusive = false; // 默认false,可多个消费者消费同一个队列 /** * 当消费者均断开连接,是否自动删除队列 */ private boolean autoDelete = false; // 默认false,不自动删除,避免消费者断开队列丢弃消息 /** * 绑定死信队列的交换机名称 */ private String deadLetterExchange; /** * 绑定死信队列的路由key */ private String deadLetterRoutingKey; /** * 交换机其他参数 */ private Map<String, Object> arguments; } }
点击查看完整 RabbitModuleProperties 内容
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import java.util.List; /** * 绑定配置基础类 */ @Data @Configuration @ConfigurationProperties("spring.rabbitmq") public class RabbitModuleProperties { /** * 模块配置 */ List<ModuleProperties> modules; }
7、总体结构一览如下
简单配置后使用
1、yml 配置文件
spring: rabbitmq: # 动态创建和绑定队列、交换机的配置 modules: # 正常队列 - routing-key: test consumer: testConsumerService producer: testProducerService autoAck: true queue: name: test dead-letter-exchange: dead dead-letter-routing-key: dead arguments: # 1分钟(测试),单位毫秒 x-message-ttl: 1000 exchange: name: test # 死信队列 - routing-key: dead consumer: deadConsumerService producer: deadProducerService autoAck: true queue: name: dead exchange: name: dead host: 你的ip port: 端口 username: 账号 password: 你的密码 virtual-host: dev
2、简单的测试一个请求
请求入口
消费者开始消费
死信消费
3、具体的效果如下
本文作者为DBC,转载请注明。