博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)

DBC 1.4K 0
来源:百度百科

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

一、简单的使用MQ

1.搭建就不详细介绍了,网上有很多,这里推荐一种最简单的docker傻瓜式搭建

云服务器基础设施安装之RabbitMQ安装——(难度:无,等级:0)

2年前 (2022-08-29) 0
博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)插图

2.引入依赖

		<!--引入AMQP-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

3.添加配置文件

##----------rabbit配置--------------
spring.rabbitmq.host=8.142.19.202
spring.rabbitmq.port=5672
#需要手工创建虚拟主机
spring.rabbitmq.virtual-host=dev
spring.rabbitmq.username=admin
spring.rabbitmq.password=666
#消息确认方式,manual(手动ack) 和auto(自动ack); 消息消费重试到达指定次数进到异常交换机和异常队列,需要改为自动ack确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=auto
这里输入自己的账号密码即可,并没有什么需要注意的,注释非常清楚

4.添加MQ配置类

RabbitMQConfig

点击查看完整内容

5.编写消费者

DbcTestAddLinkMQListener

点击查看完整内容
温馨提示

当代码编写到了这里,我们其实已经基本完成了,当我们运行项目之后,就可以在RabbitMQ后台看到相应的交换机和对应的队列了,如下图所示

示例图

博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)插图2
博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)插图4

这里我们看到还有一个Error,这我们后面会说到

6.编写生产者

    @Override
    public void addMQ() {
        UserDO userDO = UserDO.builder()
                .name("刘罗锅")
                .time(new Date())
                .build();

        rabbitTemplate.convertAndSend(rabbitMQConfig.getDbcTestEventExchange(), rabbitMQConfig.getDbcTestAddRoutingKey(),userDO);
    }
温馨提示

大功告成!就是那么简单,这就简单的完成了MQ的第一步,简单的MQ使用

二、配置异常交换机

为什么要配置异常交换机?

在我们的服务在进行高并发操作的时候,有时候不可避免的会出现消费失败的情况,这时候我们可以让MQ进行自动的重新消费,我们定义好重新消费的次数,当重试到该次数之后还是没有能够进行正常消费,那么我们就将这个消息投递到异常交换机中,来进行其他操作,比如说人工处理等。。。

1.定义异常配置

#开启重试,消费者代码不能添加try catch捕获不往外抛异常
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=4
# 重试消息的时间间隔,5秒
spring.rabbitmq.listener.simple.retry.initial-interval=5000

2.定义错误交换机、队列配置类

RabbitMQErrorConfig

点击查看完整内容

3.定义异常消费者

DbcTestErrorMQListener

package com.example.test05.demo.listener;

import com.example.test05.demo.model.UserDO;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author DBC
 * @version 1.0.0
 * @date 2023年01月01日 16:53:26
 * @website dbc655.top
 */
@Component
@Slf4j
@RabbitListener(queuesToDeclare = { @Queue("dbc.error.queue") })
public class DbcTestErrorMQListener {

    @RabbitHandler
    public void shortLinkHandler(UserDO eventMessage, Message message, Channel channel) throws IOException {
        log.error("告警:监听到消息DbcTestErrorMQListener eventMessage消息内容:{}",eventMessage);
        log.error("告警:Message:{}",message);
        log.error("告警成功,发送通知短信");

    }



}

我们可以如下图中清晰的看到,当消息消费失败4次之后,这个错误消息我们也进行了消费了,好了,又完成一个小功能![aru_36]

点击查看完整内容

博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)插图6

三、进阶操作 —— 优雅封装RabbitMQ的配置(动态)

功能有如下:
  1. 动态队列
  2. 动态消费者
  3. 动态生产者

以上需求仅仅需要通过修改配置文件即可实现

关键代码实现

1、关键MQ配置类 —— RabbitMqConfig

点击查看完整 RabbitMqConfig 内容

2、对应必要枚举 —— RabbitEnum、RabbitExchangeTypeEnum

点击查看完整 RabbitEnum 内容

点击查看完整 RabbitExchangeTypeEnum 内容

温馨提示

RabbitEnum 里面的 xxx 修改为公司对应的前缀即可

3、消费者相关配置类 —— AbsConsumerService、ConsumerContainerFactory

点击查看完整 AbsConsumerService 内容

点击查看完整 ConsumerContainerFactory 内容

以下是对于getObject方法中各set语句意义的详细解释
  1. container.setAmqpAdmin(amqpAdmin)
    1. 设置SimpleMessageListenerContainer使用的AmqpAdmin实例。AmqpAdminRabbitMQ操作相关的管理接口,提供创建、删除等队列、交换器、绑定等资源的方法。在实际应用中,可能需要动态创建或管理RabbitMQ相关资源时使用。
  2. container.setConnectionFactory(connectionFactory)
    1. 设置连接到RabbitMQ服务器所需的ConnectionFactoryConnectionFactory定义了与RabbitMQ建立连接的基本信息(如主机名、端口、用户名、密码等),以及连接和频道创建的相关配置。通过设置ConnectionFactorySimpleMessageListenerContainer能够建立与RabbitMQ服务器的有效连接。
  3. container.setQueues(queue)
    1. 指定SimpleMessageListenerContainer要监听的队列。这里传入的queue参数通常是一个Queue对象或者包含多个Queue对象的集合。这样容器就知道从哪些队列中接收消息进行消费。
  4. container.setPrefetchCount(1)
    1. 设置消费者预取值(prefetch count)。这是RabbitMQ消费者的一种流量控制机制,表示一次最多从服务器拉取多少条未确认的消息。此处设置为1,意味着消费者每次只拉取一条消息进行处理,待该消息被确认后才会请求下一条消息。这种设置有助于减少内存占用,防止因消费者处理速度慢导致大量消息堆积在客户端。
  5. container.setConcurrentConsumers(20)
    1. 设置初始并发消费者数量。并发消费者是指在同一时间启动多个线程来消费队列中的消息。设置为20表示容器初始化时会启动20个线程同时消费消息,以提高消息处理的并发度和吞吐量。
  6. container.setMaxConcurrentConsumers(100)
    1. 设置最大并发消费者数量。当消息负载增加时,容器可以自动增加并发消费者数量,但不会超过此设定值。设置为100意味着在高负载情况下,最多可动态增加至100个并发消费者。
  7. container.setDefaultRequeueRejected(Boolean.FALSE)
    1. 设置当消息消费失败且未手动处理(如未调用basicRejectbasicNack)时,是否默认将消息重新入队。设置为Boolean.FALSE表示消费失败的消息不会被重新入队,通常用于处理无法恢复或不需要重试的情况。如果不希望消息因消费失败而无限循环,这个设置非常重要。
  8. container.setAdviceChain(createRetry())
    1. 设置消息消费的建议链(Advice Chain),即为容器添加一组拦截器(Interceptor)。这些拦截器可以在消息消费的不同阶段执行特定逻辑,如重试、日志记录等。createRetry()方法应返回一个实现了Advice接口的对象,用于实现重试策略。当消息消费失败时,根据配置的重试策略决定是否重新投递消息。
  9. container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL)
    1. 设置消息确认模式。autoAck为布尔值,决定是否自动确认消息。如果autoAcktrue,设置确认模式为AcknowledgeMode.AUTO,表示消费者在接收到消息后立即自动确认,无需手动确认;若autoAckfalse,设置确认模式为AcknowledgeMode.MANUAL,消费者需在消息处理完成后手动确认消息。手动确认有助于确保只有成功处理的消息才会被确认,避免消息丢失或重复消费。
  10. if (Objects.nonNull(consumer)) { container.setMessageListener(consumer); }
    1. 如果consumer对象非空,将其设置为SimpleMessageListenerContainer的消息监听器。consumer通常是实现了MessageListener接口的类实例,负责处理从队列接收到的实际业务消息。设置消息监听器后,每当有消息到达,容器就会调用监听器的onMessage方法来处理消息。

附送一个空的消费者接口 —— ConsumerService

import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

/**
 * 消费者接口
 */
public interface ConsumerService extends ChannelAwareMessageListener {

}

4、重试处理器

import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;

/**
 * 重试处理器
 */
public interface CustomRetryListener {

 /**
  * 最后一次重试失败回调
  * @param context
  * @param callback
  * @param throwable
  * @param <E>
  * @param <T>
  */
 public <E extends Throwable, T> void lastRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable);

 /**
  * 每次失败的回调
  * @param context
  * @param callback
  * @param throwable
  * @param <E>
  * @param <T>
  */
 public <E extends Throwable, T> void onRetry(RetryContext context, RetryCallback<T,E> callback, Throwable throwable);
}

5、生产者相关 —— AbsProducerService、ProducerService

点击查看完整 AbsProducerService 内容

ProducerService

/**
 * 生产者接口
 */
public interface ProducerService {

 /**
  * 发送消息
  * @param message
  */
 void send(Object message);

}

6、配置类 —— ModuleProperties、RabbitModuleProperties

点击查看完整 ModuleProperties 内容

点击查看完整 RabbitModuleProperties 内容

7、总体结构一览如下

博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)插图8

简单配置后使用

1、yml 配置文件

spring:
  rabbitmq:
    # 动态创建和绑定队列、交换机的配置
    modules:
      # 正常队列
      - routing-key: test
        consumer: testConsumerService
        producer: testProducerService
        autoAck: true
        queue:
          name: test
          dead-letter-exchange: dead
          dead-letter-routing-key: dead
          arguments:
            # 1分钟(测试),单位毫秒
            x-message-ttl: 1000
        exchange:
          name: test

      # 死信队列
      - routing-key: dead
        consumer: deadConsumerService
        producer: deadProducerService
        autoAck: true
        queue:
          name: dead
        exchange:
          name: dead

    host: 你的ip
    port: 端口
    username: 账号
    password: 你的密码
    virtual-host: dev

2、简单的测试一个请求

温馨提示

我们简单模拟一下即可,我们模拟的操作如下

  1. 收到请求后发送普通的mq消息(生产者开始生产[aru_44])
  2. mq 收到消息,由消费者开始消费[aru_120]
  3. 消费者无法消费这条消息,开始重试,达到最大重试次数(前面我们的代码设置为5),将这个消息投递到死信队列
  4. 由死信队列完成最终的消费

由上可知,我们测试了生产者、消费者、重试机制、失败后投递到死信队列、死信的消费

请求入口

博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)插图10

消费者开始消费

博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)插图12

死信消费

博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)插图14

3、具体的效果如下

博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)插图16
博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)插图18

温馨提示

到这里我们已经成功测试了动态MQ配置,非常的优雅,并且可以非常的灵活加入各种个性化的需求![aru_50]

而我们仅仅只需要在配置文件中添加配置即可,如果使用动态的配置文件,如Nacos,即可实现线上动态配置MQ的消费者、生产者。。。

博主精品——RabbitMQ(含优雅封装RabbitMQ 实现动态队列、动态生产者,动态消费者绑定)插图20

发表评论 取消回复
表情 图片 链接 代码

分享