在使用消息队列的过程中,你会遇到很多问题,比如选择哪款消息队列更适合你的业务系统?如何保证系统的高可靠、高可用和高性能?如何保证消息不重复、不丢失?如何做到水平扩展?诸如此类的问题,每一个问题想要解决好,都不太容易。
消息队列底层技术:高并发、序列化反序列化、一致性协议、高性能通信、分布式事务、内存管理、海量数据存储、数据压缩、文件和高性能IO、高可用分布式系统、异步编程模型、功能简洁、结构清晰
消息队列相关的协议和标准有:JMS、AMQP、MQTT 和 OpenMessaging
消息模型:队列模式和发布—订阅模式。两者最大区别是一份消息数据能不能被消费多次的问题。现代的消息队列产品使用的消息模型大多是发布-订阅模型
消息队列选型
必须是开源产品,有bug可以修改源码;近几年比较流行,社区活跃度高,流行的产品与周边生态系统会有一个比较好的集成和兼容,比如,Kafka 和 Flink 就有比较好的兼容性,Flink 内置了 Kafka 的 Data Source,使用 Kafka 就很容易作为 Flink 的数据源开发流计算应用,如果你用一个比较小众的消息队列产品,在进行流计算的时候,你就不得不自己开发一个 Flink 的 Data Source。
一个合格消息队列产品特性:
• 消息的可靠传递:确保不丢消息;• Cluster:支持集群,确保不会因为某个节点宕机导致服务不可用,当然也不能丢消息;• 性能:具备足够好的性能,能满足绝大多数场景的性能要求。
产品比较:
• RabbitMQ:Erlang 语言编写的,采用队列模式
,最早是为电信行业系统之间的可靠通信设计的,也是少数几个支持 AMQP 协议的消息队列之一。开箱即用的消息队列,非常容易部署和使用。RabbitMQ 一个比较有特色的功能是支持非常灵活的路由配置,和其他消息队列不同的是,它在生产者(Producer)和队列(Queue)之间增加了一个 Exchange 模块,你可以理解为交换机,根据配置的路由规则将生产者发出的消息分发到不同的队列中。路由的规则也非常灵活,甚至你可以自己来实现路由规则。RabbitMQ 的客户端支持的编程语言大概是所有消息队列中最多的,如果你的系统是用某种冷门语言开发的,那你多半可以找到对应的 RabbitMQ 客户端。
缺点:性能比较差,它大概每秒钟可以处理几万到十几万条消息。对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。
• RocketMQ:阿里出品,Java语言编写的,历经多次双十一大促考验,它的性能、稳定性和可靠性都是值得信赖,每秒钟大概能处理几十万条消息,采用发布订阅模式
。源代码相对也比较容易读懂,也容易对 RocketMQ 进行扩展或者二次开发。RocketMQ 对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应。比如交易系统和金融系统,很在意响应时延,那应该选择使用 RocketMQ
缺点:国际上还没有那么流行,与周边生态系统的集成和兼容程度要略逊一筹。
• Kafka:LinkedIn 出品,Scala 和 Java 语言编写的,采用发布订阅模式
,目前也是 Apache 的顶级项目。Kafka 最初的设计目的是用于处理海量的日志。在数据可靠性、稳定性和功能性等方面都可以满足绝大多数场景的需求。Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域,几乎所有的相关开源软件系统都会优先支持 Kafka。设计上大量使用了批量和异步的思想,这种设计使得 Kafka 能做到超高的性能。Kafka 的性能,尤其是异步收发的性能,是三者中最好的,但与 RocketMQ 并没有量级上的差异,大约每秒钟可以处理几十万条消息。
缺点:Kafka 这种异步批量的设计带来的问题是,它的同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送,在它的 Broker 中,很多地方都会使用这种“先攒一波再一起处理”的设计。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。
• ActiveMQ:最老牌的开源消息队列,是十年前唯一可供选择的开源消息队列,目前已进入老年期,社区不活跃。无论是功能还是性能方面,ActiveMQ 都与现代的消息队列存在明显的差距。支持队列模式(单消费)和发布—订阅模式,两者最大区别是一份消息数据能不能被消费多次的问题。现代的消息队列产品使用的消息模型大多是发布-订阅模型
• ZeroMQ:不能称之为一个消息队列,而是一个基于消息队列的多线程网络库,如果你的需求是将消息队列的功能集成到你的系统进程中,可以考虑使用 ZeroMQ。
重要问题
1、如何确保消息不会丢失?
• 生产阶段
在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。另一种方式,如果MQ Server端无响应,生产端需要支持重试,当然Broker需要支持幂等。
• 存储阶段
• 配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应 • 如果是 Broker 是由多个节点组成的集群,配置 Broker 参数,至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。
• 消费阶段
客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。
2、消费过程中如何处理重复消息?
幂等性。定义:如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。
At least once 幂等消费 = Exactly once。
• 数据库的唯一约束实现幂等
• 为更新的数据设置前置条件。比如:
• 对一个订单支付时,订单状态要求是待付款
; • 增加一个版本号属性
3、如何预防消息积压
• 发送端性能优化
• 增加每次发送消息的批量大小,比如之前一次一条,可以考虑一次100条,象离线分析系统。 • 增加并发数
•消费端性能优化
• 一般是下游系统来不及处理上游发送的消息,才会导致消息堆积。 • 扩容 Consumer 的实例数量的同时,必须同步扩容topic的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。因为对于消费者来说,在每个分区上实际上只能支持单线程消费。
4、系统发生了消息积压,该如何处理?
快速定位消息积压的原因。要么是发送变快了,要么是消费变慢了。通过监控数据,很容易确定是哪种原因。如果是单位时间发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。
如果短时间内没有足够的服务器进行扩容,考虑将系统降级,关闭一些不重要的业务,减少发送数据量。
5、如何保证消息的严格顺序?
topic 层面是无法保证严格顺序的,只有在队列上才能保证消息的严格顺序。
如果说,你的业务必须要求全局严格顺序,就只能把消息队列数配置成 1,生产者和消费者也只能是一个实例,这样才能保证全局严格顺序。