一、简单的使用MQ
1.搭建就不详细介绍了,网上有很多,这里推荐一种最简单的docker傻瓜式搭建
2.引入依赖
<!--引入AMQP--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.添加配置文件
##----------rabbit配置-------------- spring.rabbitmq.host=8.142.19.202 spring.rabbitmq.port=5672 #需要手工创建虚拟主机 spring.rabbitmq.virtual-host=dev spring.rabbitmq.username=admin spring.rabbitmq.password=666 #消息确认方式,manual(手动ack) 和auto(自动ack); 消息消费重试到达指定次数进到异常交换机和异常队列,需要改为自动ack确认消息 spring.rabbitmq.listener.simple.acknowledge-mode=auto
4.添加MQ配置类
RabbitMQConfig
点击查看完整内容
package com.example.test05.demo.config; import lombok.Data; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author DBC * @version 1.0.0 * @date 2023年01月01日 15:18:18 * @website dbc655.top */ @Configuration @Data public class RabbitMQConfig { /** * 交换机 */ private String dbcTestEventExchange="dbc.event.exchange"; /** * 新增短链具体的routingKey,【发送消息使用】 */ private String dbcTestAddRoutingKey="dbc.add.test.mapping.routing.key"; //相关配置==================================== /** * 新增dbc测试 队列 */ private String dbcTestAddLinkQueue="dbc.add.test.queue"; /** * topic类型的binding key,用于绑定队列和交换机,是用于消费者 */ private String dbcTestAddLinkBindingKey="dbc.add.test.*.routing.key"; /** * 创建交换机 Topic类型 * 一般一个微服务一个交换机 * @return */ @Bean public Exchange shortLinkEventExchange(){ // 开启持久化 true 是否自动删除 false return new TopicExchange(dbcTestEventExchange,true,false); } /** * 新增dbc测试队列和交换机的绑定关系建立 */ @Bean public Binding shortLinkAddApiBinding(){ return new Binding(dbcTestAddLinkQueue,Binding.DestinationType.QUEUE, dbcTestEventExchange,dbcTestAddLinkBindingKey,null); } /** * 新增dbc测试 普通队列,用于被监听 */ @Bean public Queue shortLinkAddLinkQueue(){ return new Queue(dbcTestAddLinkQueue,true,false,false); } }
5.编写消费者
DbcTestAddLinkMQListener
点击查看完整内容
package com.example.test05.demo.listener; import com.example.test05.demo.enums.BizCodeEnum; import com.example.test05.demo.exception.BizException; import com.example.test05.demo.model.UserDO; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import sun.plugin2.message.EventMessage; import java.io.IOException; /** * @author DBC * @version 1.0.0 * @date 2023年01月01日 15:53:34 * @website dbc655.top */ @Component @Slf4j @RabbitListener(queues = "dbc.add.test.queue") public class DbcTestAddLinkMQListener { @RabbitHandler public void shortLinkHandler(UserDO eventMessage, Message message, Channel channel) throws IOException { log.info("监听到消息DbcTestAddLinkMQListener message消息内容:{}",message); long tag = message.getMessageProperties().getDeliveryTag(); try{ int i = 1/0; //TODO 处理业务逻辑 }catch (Exception e){ //处理业务异常,还有进行其他操作,比如记录失败原因 log.error("消费失败:{}",eventMessage); throw new BizException(BizCodeEnum.JV_SHUOMING); } log.info("消费成功:{}",eventMessage); //确认消息消费成功 // channel.basicAck(tag,false); } }
示例图
这里我们看到还有一个Error
,这我们后面会说到
6.编写生产者
@Override public void addMQ() { UserDO userDO = UserDO.builder() .name("刘罗锅") .time(new Date()) .build(); rabbitTemplate.convertAndSend(rabbitMQConfig.getDbcTestEventExchange(), rabbitMQConfig.getDbcTestAddRoutingKey(),userDO); }
二、配置异常交换机
为什么要配置异常交换机?
1.定义异常配置
#开启重试,消费者代码不能添加try catch捕获不往外抛异常 spring.rabbitmq.listener.simple.retry.enabled=true #最大重试次数 spring.rabbitmq.listener.simple.retry.max-attempts=4 # 重试消息的时间间隔,5秒 spring.rabbitmq.listener.simple.retry.initial-interval=5000
2.定义错误交换机、队列配置类
RabbitMQErrorConfig
点击查看完整内容
package com.example.test05.demo.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author DBC * @date 2023/1/3 10:09 * @network dbc655.top */ @Configuration @Slf4j public class RabbitMQErrorConfig { /** * 异常交换机 */ private String shortLinkErrorExchange = "dbc.error.exchange"; /** * 异常队列 */ private String shortLinkErrorQueue = "dbc.error.queue"; /** * 异常routing.key */ private String shortLinkErrorRoutingKey = "dbc.error.routing.key"; @Autowired private RabbitTemplate rabbitTemplate; /** * 创建异常交换机 * @return */ @Bean public TopicExchange errorTopicExchange(){ return new TopicExchange(shortLinkErrorExchange,true,false); } /** * 创建异常队列 * @return */ @Bean public Queue errorQueue(){ return new Queue(shortLinkErrorQueue,true); } /** * 建立绑定关系 * @return */ @Bean public Binding bindingErrorQueueAndExchange(){ return BindingBuilder.bind(errorQueue()).to(errorTopicExchange()).with(shortLinkErrorRoutingKey); } /** * 配置 RepublishMessageRecoverer * * 消费消息重试一定次数后,用特定的routingKey转发到指定的交换机中,方便后续排查和告警 * * @return */ @Bean public MessageRecoverer messageRecoverer(){ return new RepublishMessageRecoverer(rabbitTemplate,shortLinkErrorExchange,shortLinkErrorRoutingKey); } }
3.定义异常消费者
DbcTestErrorMQListener
package com.example.test05.demo.listener; import com.example.test05.demo.model.UserDO; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author DBC * @version 1.0.0 * @date 2023年01月01日 16:53:26 * @website dbc655.top */ @Component @Slf4j @RabbitListener(queuesToDeclare = { @Queue("dbc.error.queue") }) public class DbcTestErrorMQListener { @RabbitHandler public void shortLinkHandler(UserDO eventMessage, Message message, Channel channel) throws IOException { log.error("告警:监听到消息DbcTestErrorMQListener eventMessage消息内容:{}",eventMessage); log.error("告警:Message:{}",message); log.error("告警成功,发送通知短信"); } }
我们可以如下图中清晰的看到,当消息消费失败4次之后,这个错误消息我们也进行了消费了,好了,又完成一个小功能![aru_36]
点击查看完整内容
11:04:18.118 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue]) 11:04:18.119 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023) 11:04:23.119 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue]) 11:04:23.119 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023) 11:04:28.135 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue]) 11:04:28.137 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023) 11:04:33.144 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.e.t.d.l.DbcTestAddLinkMQListener - 监听到消息DbcTestAddLinkMQListener message消息内容:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.event.exchange, receivedRoutingKey=dbc.add.test.mapping.routing.key, deliveryTag=2, consumerTag=amq.ctag-LQS0AMJkaIGqRmlj7WMp7g, consumerQueue=dbc.add.test.queue]) 11:04:33.144 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] ERROR c.e.t.d.l.DbcTestAddLinkMQListener - 消费失败:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023) 11:04:33.146 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] WARN o.s.a.r.r.RepublishMessageRecoverer - Republishing failed message to exchange 'dbc.error.exchange' with routing key dbc.error.routing.key 11:04:33.210 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] ERROR c.e.t.d.l.DbcTestErrorMQListener - 告警:监听到消息DbcTestErrorMQListener eventMessage消息内容:UserDO(id=null, name=刘罗锅, time=Tue Jan 03 11:04:18 CST 2023) 11:04:33.211 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] ERROR c.e.t.d.l.DbcTestErrorMQListener - 告警:Message:(Body:'[serialized object]' MessageProperties [headers={x-exception-message=举例说明, x-original-routingKey=dbc.add.test.mapping.routing.key, x-original-exchange=dbc.event.exchange, x-exception-stacktrace=org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.test05.demo.listener.DbcTestAddLinkMQListener.shortLinkHandler(com.example.test05.demo.model.UserDO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.io.IOException' threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:252) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:194) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:93) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:116) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) at org.springframework.amqp.rabbit.listener.$Proxy86.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) at java.lang.Thread.run(Thread.java:748) Caused by: BizException(code=666666, msg=举例说明) at com.example.test05.demo.listener.DbcTestAddLinkMQListener.shortLinkHandler(DbcTestAddLinkMQListener.java:36) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:163) at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:81) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:244) ... 27 more }, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dbc.error.exchange, receivedRoutingKey=dbc.error.routing.key, deliveryTag=2, consumerTag=amq.ctag-Y9ZQTs1fx16GDYN_IgoOFA, consumerQueue=dbc.error.queue]) 11:04:33.211 logback [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] ERROR c.e.t.d.l.DbcTestErrorMQListener - 告警成功,发送通知短信




三、进阶操作 —— ???
未完待续。。。
最后更新于:2023年1月3日11:05:52
本文作者为DBC,转载请注明。