消息队列面试专题

DBC 1.7K 0

为什么使用消息队列,怎么选择技术

  • 异步
    • 例子:
  • 解耦:
    • 例子:
  • 削峰:
    • 例子:
  • 缺点:
    • 系统可用性越低:外部依赖越多,依赖越多,出问题风险越大
    • 系统复杂性提高:需要考虑多种场景,比如消息重复消费,消息丢失
    • 需要更多的机器和人力: 消息队列一般集群部署,而且需要运维和监控,例如topic申请等

消息队列选择问题:Apache ActiveMQ、Kafka、RabbitMQ、RocketMQ

  • ActiveMQ:http://activemq.apache.org/
    • Apache出品,历史悠久,支持多种语言的客户端和协议,支持多种语言Java, .NET, C++ 等,基于JMS Provider的实现

    缺点:吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用

  • Kafka:http://kafka.apache.org/
    • 是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户的行动),副本集机制,实现数据冗余,保障数据尽量不丢失;支持多个生产者和消费者

    缺点:不支持批量和广播消息,运维难度大,文档比较少, 需要掌握Scala,二次开发难度大

  • RabbitMQ:http://www.rabbitmq.com/
    • 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错

    缺点:使用Erlang开发,阅读和修改源码难度大

  • RocketMQ:http://rocketmq.apache.org/
    • 阿里开源的一款的消息中间件, 纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点, 性能强劲(零拷贝技术),支持海量堆积, 支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤、延迟消息等,在阿里内部进行大规模使用,适合在电商,互联网金融等领域使用缺点:部分实现不是按照标准JMS规范,有些系统要迁移或者引入队列需要修改代码

消息队列怎么避免重复消费

消息队列面试专题插图

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重

  • 接口幂等性保障 ,消费端处理业务消息要保持幂等性
    • Redis
      • setNX() , 做消息id去重 java版本目前不支持设置过期时间
      • 				//Redis中操作,判断是否已经操作过 TODO
                          Long flag =  jedis.setNX(key);
                          if(flag){
                                //消费
                          }else{
                               //忽略,重复消费
                          }
        
      • 拓展(如果再用expire则不是原子操作,可以用下面方式实现分布式锁)
      • 加锁
        String result = jedis.set(key, value, "NX", "PX", expireTime)
        
        解锁(Lua脚本,先检查key,匹配再释放锁,lua可以保证原子性)
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        
        备注:lockKey可以是商品id,requestId用于标示是同个客户端
      • Incr 原子操作:key自增,大于0 返回值大于0则说明消费过
      • 				int num =  jedis.incr(key);
                          if(num == 1){
                                //消费
                          }else{
                               //忽略,重复消费
                          }
        • 上述两个方式都可以,但是不能用于分布式锁,考虑原子问题,但是排重可以不考虑原子问题,数据量多需要设置过期时间
      • 数据库去重表
        • 某个字段使用Message的key做唯一索引
        • 主键唯一
      • 消息队列面试专题插图2

RocketMQ如何保证消息的可靠性传输

  • producer端
    • 不采用oneway发送,使用同步或者异步方式发送,做好重试,但是重试的Message key必须唯一
    • 投递的日志需要保存,关键字段,投递时间、投递状态、重试次数、请求体、响应体
  • broker端
    • 双主双从架构,NameServer需要多节点
    • 同步双写、异步刷盘 (同步刷盘则可靠性更高,但是性能差点,根据业务选择)
  • consumer端
    • 消息消费务必保留日志,即消息的元数据和消息体
    • 消费端务必做好幂等性处理
  • 投递到broker端后
    • 机器断电重启:异步刷盘,消息丢失;同步刷盘消息不丢失
    • 硬件故障:可能存在丢失,看队列架构

消息发生大量堆积应该怎么处理

线上故障了,怎么处理

  • 消息堆积了10小时,有几千万条消息待处理,现在怎么办?
  • 修复consumer, 然后慢慢消费?也需要几小时才可以消费完成,新的消息怎么办?

 

正确的姿势

  • 临时topic队列扩容,并提高消费者能力,但是如果增加Consumer数量,但是堆积的topic里面的message queue数量固定,过多的consumer不能分配到message queue
  • 编写临时处理分发程序,从旧topic快速读取到临时新topic中,新topic的queue数量扩容多倍,然后再启动更多consumer进行在临时新的topic里消费

 

RocketMQ高性能的原因分析

  • MQ架构配置
    • 顺序写,随机读,零拷贝
    • 同步刷盘SYNC_FLUSH和异步刷盘ASYNC_FLUSH, 通过flushDiskType配置
    • 同步复制和异步复制,通过brokerRole配置,ASYNC_MASTER, SYNC_MASTER, SLAVE
    • 推荐同步复制(双写),异步刷盘
  • 发送端高可用
    • 双主双从架构:创建Topic对应的时候,MessageQueue创建在多个Broker上即相同的Broker名称,不同的brokerid(即主从模式);当一个Master不可用时,组内其他的Master仍然可用。

      但是机器资源不足的时候,需要手工把slave转成master,目前不支持自动转换,可用shell处理

  • 消费高可用
    • 主从架构:Broker角色,Master提供读写,Slave只支持读
    • Consumer不用配置,当Master不可用或者繁忙的时候,Consumer会自动切换到Slave节点进行能读取
  • 提高消息的消费能力
    • 并行消费
      • 增加多个节点
      • 增加单个Consumer的并行度,修改consumerThreadMin和consumerThreadMax
      • 批量消费,设置Consumer的consumerMessageBatchMaxSize, 默认是1,如果为N,则消息多的时候,每次收到的消息为N条
    • 择 Linux Ext4 文件系统,Ext4 文件系统删除 1G 大小的文件通常耗时小于 50ms,而 Ext3文件系统耗时需要 1s,删除文件时磁盘IO 压力极大,会导致 IO 操作超时

发表评论 取消回复
表情 图片 链接 代码

分享