为什么使用消息队列,怎么选择技术
- 异步
- 例子:
- 解耦:
- 例子:
- 削峰:
- 例子:
- 缺点:
- 系统可用性越低:外部依赖越多,依赖越多,出问题风险越大
- 系统复杂性提高:需要考虑多种场景,比如消息重复消费,消息丢失
- 需要更多的机器和人力: 消息队列一般集群部署,而且需要运维和监控,例如topic申请等
消息队列选择问题:Apache ActiveMQ、Kafka、RabbitMQ、RocketMQ
- ActiveMQ:http://activemq.apache.org/
- Apache出品,历史悠久,支持多种语言的客户端和协议,支持多种语言Java, .NET, C++ 等,基于JMS Provider的实现
缺点:吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用
- Kafka:http://kafka.apache.org/
- 是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户的行动),副本集机制,实现数据冗余,保障数据尽量不丢失;支持多个生产者和消费者
缺点:不支持批量和广播消息,运维难度大,文档比较少, 需要掌握Scala,二次开发难度大
- RabbitMQ:http://www.rabbitmq.com/
- 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错
缺点:使用Erlang开发,阅读和修改源码难度大
- RocketMQ:http://rocketmq.apache.org/
- 阿里开源的一款的消息中间件, 纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点, 性能强劲(零拷贝技术),支持海量堆积, 支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤、延迟消息等,在阿里内部进行大规模使用,适合在电商,互联网金融等领域使用缺点:部分实现不是按照标准JMS规范,有些系统要迁移或者引入队列需要修改代码
消息队列怎么避免重复消费
RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重
- 接口幂等性保障 ,消费端处理业务消息要保持幂等性
- Redis
- setNX() , 做消息id去重 java版本目前不支持设置过期时间
-
//Redis中操作,判断是否已经操作过 TODO Long flag = jedis.setNX(key); if(flag){ //消费 }else{ //忽略,重复消费 }
- 拓展(如果再用expire则不是原子操作,可以用下面方式实现分布式锁)
-
加锁 String result = jedis.set(key, value, "NX", "PX", expireTime) 解锁(Lua脚本,先检查key,匹配再释放锁,lua可以保证原子性) String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); 备注:lockKey可以是商品id,requestId用于标示是同个客户端
- Incr 原子操作:key自增,大于0 返回值大于0则说明消费过
-
int num = jedis.incr(key); if(num == 1){ //消费 }else{ //忽略,重复消费 }
-
- 上述两个方式都可以,但是不能用于分布式锁,考虑原子问题,但是排重可以不考虑原子问题,数据量多需要设置过期时间
- 数据库去重表
- 某个字段使用Message的key做唯一索引
- 主键唯一
-
- Redis
RocketMQ如何保证消息的可靠性传输
- producer端
- 不采用oneway发送,使用同步或者异步方式发送,做好重试,但是重试的Message key必须唯一
- 投递的日志需要保存,关键字段,投递时间、投递状态、重试次数、请求体、响应体
- broker端
- 双主双从架构,NameServer需要多节点
- 同步双写、异步刷盘 (同步刷盘则可靠性更高,但是性能差点,根据业务选择)
- consumer端
- 消息消费务必保留日志,即消息的元数据和消息体
- 消费端务必做好幂等性处理
- 投递到broker端后
- 机器断电重启:异步刷盘,消息丢失;同步刷盘消息不丢失
- 硬件故障:可能存在丢失,看队列架构
消息发生大量堆积应该怎么处理
线上故障了,怎么处理
- 消息堆积了10小时,有几千万条消息待处理,现在怎么办?
- 修复consumer, 然后慢慢消费?也需要几小时才可以消费完成,新的消息怎么办?
正确的姿势
- 临时topic队列扩容,并提高消费者能力,但是如果增加Consumer数量,但是堆积的topic里面的message queue数量固定,过多的consumer不能分配到message queue
- 编写临时处理分发程序,从旧topic快速读取到临时新topic中,新topic的queue数量扩容多倍,然后再启动更多consumer进行在临时新的topic里消费
RocketMQ高性能的原因分析
- MQ架构配置
- 顺序写,随机读,零拷贝
- 同步刷盘SYNC_FLUSH和异步刷盘ASYNC_FLUSH, 通过flushDiskType配置
- 同步复制和异步复制,通过brokerRole配置,ASYNC_MASTER, SYNC_MASTER, SLAVE
- 推荐同步复制(双写),异步刷盘
- 发送端高可用
- 双主双从架构:创建Topic对应的时候,MessageQueue创建在多个Broker上即相同的Broker名称,不同的brokerid(即主从模式);当一个Master不可用时,组内其他的Master仍然可用。
但是机器资源不足的时候,需要手工把slave转成master,目前不支持自动转换,可用shell处理
- 双主双从架构:创建Topic对应的时候,MessageQueue创建在多个Broker上即相同的Broker名称,不同的brokerid(即主从模式);当一个Master不可用时,组内其他的Master仍然可用。
- 消费高可用
- 主从架构:Broker角色,Master提供读写,Slave只支持读
- Consumer不用配置,当Master不可用或者繁忙的时候,Consumer会自动切换到Slave节点进行能读取
- 提高消息的消费能力
- 并行消费
- 增加多个节点
- 增加单个Consumer的并行度,修改consumerThreadMin和consumerThreadMax
- 批量消费,设置Consumer的consumerMessageBatchMaxSize, 默认是1,如果为N,则消息多的时候,每次收到的消息为N条
- 择 Linux Ext4 文件系统,Ext4 文件系统删除 1G 大小的文件通常耗时小于 50ms,而 Ext3文件系统耗时需要 1s,删除文件时磁盘IO 压力极大,会导致 IO 操作超时
- 并行消费
本文作者为DBC,转载请注明。