分布式消息队列中间件是是大型分布式系统不可缺少的中间件,通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理消息前不需要等待接收此消息。所以消息队列主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构。消息队列已经逐渐成为企业应用系统内部通信的核心手段,当前使用较多的消息队列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等,而部分数据库如 Redis、MySQL 以及 PhxSQL 也可实现消息队列的功能。
在日常学习与开发过程中,消息队列作为系统不可缺少的中间件,显得十分的重要。在现代云架构中,应用程序被分解为多个规模较小且更易于开发、部署和维护的独立构建块。消息队列可为这些分布式应用程序提供通信和协调。而本人也在工作的过程中,前前后后后接触到了 Kafka、RabbitMQ 两款消息队列。所以,本系列文章也主要以 RabbitMQ 和 Kafka 两款典型的消息中间件来做分析。本文是该系列的开篇,主要讲解消息队列的概述、特点等,然后对消息队列使用场景进行分析,最后对市面上比较常见的消息队列产品进行技术对比。
消息队列概述
消息队列(Message Queue,简称 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等等功能,其作为分布式系统架构中的一个重要组件,有着举足轻重的地位。消息队列是构建分布式互联网应用的基础设施,通过 MQ 实现的松耦合架构设计可以提高系统可用性以及可扩展性,是适用于现代应用的最佳设计方案。
消息队列特点
为什么要用消息队列?
通过异步处理提高系统性能
讲解该特点之前,我们先了解一下同步架构和异步架构的区别:
- 同步调用:是指从请求的发起一直到最终的处理完成期间,请求的调用方一直在同步阻塞等待调用的处理完成。
- 异步调用:是指在请求发起的处理过程中,客户端的代码已经返回了,它可以继续进行自己的后续操作,而不需要等待调用处理完成,这就叫做异步调用。
如上图,在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在高并发的情况下数据库压力剧增,使得响应速度变慢。但是在使用消息队列之后,用户的请求数据发送给消息队列之后立即 返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库(消息队列也比数据库有更好的伸缩性),因此响应速度得到大幅改善。
通过以上分析我们可以得出消息队列具有很好的削峰作用的功能——即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。如下图所示:
因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。
降低系统耦合性
我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。
我们最常见的事件驱动架构类似生产者消费者模式,在大型网站中通常用利用消息队列实现事件驱动结构。如下图所示:
消息队列使利用发布 - 订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。
消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。
另外为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。
使用消息队列带来的一些问题?
- 系统可用性降低:系统可用性在某种程度上降低,为什么这样说呢?在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等情况,但是,引入 MQ 之后你就需要如何保证消息队列的高可用。
- 系统复杂性提高:加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题,系统复发性提高。
- 一致性问题:消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况。
JMS VS AMQP
JMS
Java 消息服务(Java Message Service,JMS)应用程序接口是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。JMS PI 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。点对点与发布订阅最初是由 JMS 定义的。这两种模式主要区别或解决的问题就是发送到队列的消息能否重复消费。
JMS 规范目前支持两种消息模型:点对点(point to point,queue)和发布 / 订阅(publish/subscribe,topic)。
点对点(P2P)模型
消息生产者向消息队列中发送了一个消息之后,只能被一个消费者消费一次。点对点(P2P)使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。
Queue 实现了负载均衡,一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者,一个 queue 可以有很多消费者,他们之间实现了负载均衡, 所以 Queue 实现了一个可靠的负载均衡。
特点:
- 每个消息只用一个消费者;
- 发送者和接受者没有时间依赖;
- 接受者确认消息接受和处理成功。
发布 / 订阅(Pub/Sub)模型
消息生产者向频道发送一个消息之后,多个消费者可以从该频道订阅到这条消息并消费。发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
Topic 实现了发布和订阅,当你发布一个消息,所有订阅这个 Topic 的服务都能得到这个消息,所以从 1 到 N 个订阅者都能得到一个消息的拷贝, 只有在消息代理收到消息时有一个有效订阅时的订阅者才能得到这个消息的拷贝。
特点:
- 每个消息可以有多个订阅者;
- 客户端只有订阅后才能接收到消息;
- 持久订阅和非持久订阅。
注意:
- 发布者和订阅者有时间依赖:接受者和发布者只有建立订阅关系才能收到消息;
- 持久订阅:订阅关系建立后,消息就不会消失,不管订阅者是否都在线;
- 非持久订阅:订阅者为了接受消息,必须一直在线。当只有一个订阅者时约等于点对点模型。
点对点(P2P)模型与发布 / 订阅(Pub/Sub)模型应用
- 点对点模型:主要用于一些耗时较长的、逻辑相对独立的业务。
比如说发送邮件这样一个操作。因为发送邮件比较耗时,而且应用程序其实也并不太关心邮件发送是否成功,发送邮件的逻辑也相对比较独立,所以它只需要把邮件消息丢到消息队列中就可以返回了,而消费者也不需要关心是哪个生产者去发送的邮件,它只需要把邮件消息内容取出来以后进行消费,通过远程服务器将邮件发送出去就可以了。而且每个邮件只需要被发送一次。所以消息只被一个消费者消费就可以了。
- 发布订阅模型:如新用户注册这样一个消息,需要使用按主题发布的方式。
比如新用户注册,一个新用户注册成功以后,需要给用户发送一封激活邮件,发送一条欢迎短信,还需要将用户注册数据写入数据库,甚至需要将新用户信息发送给关联企业的系统,比如淘宝新用户信息发送给支付宝,这样允许用户可以一次注册就能登录使用多个关联产品。一个新用户注册,会把注册消息发送给一个主题,多种消费者可以订阅这个主题。比如发送邮件的消费者、发送短信的消费者、将注册信息写入数据库的消费者,跨系统同步消息的消费者等。
AMQP
AMQP(advanced message queuing protocol)在 2003 年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP 是一种协议,更准确的说是一种 binary wire-level protocol(链接协议)。这是其和 JMS 的本质差别,AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式。这使得实现了 AMQP 的 provider 天然性就是跨平台的。意味着我们可以使用 Java 的 AMQP provider,同时使用一个 python 的 producer 加一个 rubby 的 consumer。
在 AMQP 中,消息路由(message routing)和 JMS 存在一些差别,在 AMQP 中增加了 Exchange 和 binding 的角色。producer 将消息发送给 Exchange,binding 决定 Exchange 的消息应该发送到那个 queue,而 consumer 直接从 queue 中消费消息。
AMQP 提供五种消息模型:①Direct Exchange;②Fanout Exchange;③Topic Exchange;④Headers Exchange;⑤System Exchange。本质来讲,后四种和 JMS 的 pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分。
JMS 与 AMQP 对比
总结:
- AMQP 为消息定义了线路层(wire-level protocol)的协议,而 JMS 所定义的是 API 规范。在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性。
- JMS 支持 TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
- 由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题 / 订阅 方式两种。
消息队列推拉模型
Push 推消息模型:消息生产者将消息发送给消息队列,消息队列又将消息推给消息消费者。
Pull 拉消息模型:消费者请求消息队列接受消息,消息生产者从消息队列中拉该消息。
RabbitMQ
RabbitMQ 实现了 AQMP 协议,AQMP 协议定义了消息路由规则和方式。生产端通过路由规则发送消息到不同 queue,消费端根据 queue 名称消费消息。此外 RabbitMQ 是向消费端推送消息,订阅关系和消费状态保存在服务端。
Kafka
Kafka 只支持消息持久化,消费端为拉模型,消费状态和订阅关系由客户端端负责维护,消息消费完后不会立即删除,会保留历史消息。因此支持多订阅时,消息只会存储一份就可以了。同一个订阅组会消费 topic 所有消息,每条消息只会被同一个订阅组的一个消费节点消费,同一个订阅组内不同消费节点会消费不同消息。
消息队列使用场景
异步处理:实现异步处理,提升处理性能
对一些比较耗时的操作,可以把处理过程通过消息队列进行异步处理。这样做可以推迟耗时操作的处理,使耗时操作异步化,而不必阻塞客户端的程序,客户端的程序在得到处理结果之前就可以继续执行,从而提高客户端程序的处理性能。非核心流程异步化,减少系统响应时间,提高吞吐量。
例如:短信通知、终端状态推送、App 推送、用户注册等。
解耦:可以使生产者和消费者的代码实现解耦合
可以多个生产者发布消息,多个消费者处理消息,共同完成完整的业务处理逻辑,但是它们的不需要直接的交互调用,没有代码的依赖耦合。在传统的同步调用中,调用者代码必须要依赖被调用者的代码,也就是生产者代码必须要依赖消费者的处理逻辑代码,代码需要直接的耦合,而使用消息队列,这两部分的代码不需要进行任何的耦合。因为耦合程度越低的代码越容易维护,也越容易进行扩展。
比如新用户注册,如果用传统同步调用的方式,那么发邮件、发短信、写数据库、通知关联系统这些代码会和用户注册代码直接耦合起来,整个代码看起来就是完成用户注册逻辑后,后面必然跟着发邮件、发短信这些代码。如果要新增一个功能,比如将监控用户注册情况,将注册信息发送到业务监控系统,就必须要修改前面的代码,至少增加一行代码,发送注册信息到监控系统,我们知道,任何代码的修改都可能会引起 bug。
而使用分布式消息队列实现生产者和消费者解耦合以后,用户注册以后,不需要调用任何后续处理代码,只需要将注册消息发送到分布式消息队列就可以了。如果要增加新功能,只需要写个新功能的消费者程序,在分布式消息队列中,订阅用户注册主题就可以了,不需要修改原来任何一行代码。
流量削峰和流控:可以平衡流量峰值,削峰填谷
当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的 “漏斗”,进行限流控制。在下游有能力处理的时候,再进行分发。
使用消息队列,即便是访问流量持续的增长,系统依然可以持续地接收请求。这种情况下,虽然生产者发布消息的速度比消费者消费消息的速度快,但是可以持续的将消息纳入到消息队列中,用消息队列作为消息的缓冲,因此短时间内,发布者不会受到消费处理能力的影响。
在访问高峰,用户的并发访问数可能超过了系统的处理能力,所以在高峰期就可能会导致系统负载过大,响应速度变慢,更严重的可能会导致系统崩溃。这种情况下,通过消息队列将用户请求的消息纳入到消息队列中,通过消息队列缓冲消费者处理消息的速度。
消息的生产者它有高峰有低谷,但是到了消费者这里,只会按照自己的最佳处理能力去消费消息。高峰期它会把消息缓冲在消息队列中,而在低谷期它也还是使用自己最大的处理能力去获取消息,将前面缓冲起来、来不及及时处理的消息处理掉。那么,通过这种手段可以实现系统负载消峰填谷,也就是说将访问的高峰消掉,而将访问的低谷填平,使系统处在一个最佳的处理状态之下,不会对系统的负载产生太大的冲击。
举个例子:用户在支付系统成功结账后,订单系统会通过短信系统向用户推送扣费通知。短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。于是,就造成支付系统和短信系统的处理能力出现差异化。
然而用户晚上个半分钟左右收到短信,一般是不会有太大问题的。如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用中间系统转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。
易伸缩:可以让系统获得更好的伸缩性
耗时的任务可以通过分布式消息队列,向多台消费者服务器并行发送消息,然后在很多台消费者服务器上并行处理消息,也就是说可以在多台物理服务器上运行消费者。那么当负载上升的时候,可以很容易地添加更多的机器成为消费者。
例如:用户上传文件后,通过发布消息的方式,通知后端的消费者获取数据、读取文件,进行异步的文件处理操作。那么当前端发布更多文件的时候,或者处理逻辑比较复杂的时候,就可以通过添加后端的消费者服务器,提供更强大的处理能力。
隔离失效机器以及自我修复:失败隔离和自我修复
因为发布者不直接依赖消费者,所以分布式消息队列可以将消费者系统产生的错误异常与生产者系统隔离开来,生产者不受消费者失败的影响。当在消息消费过程中出现处理逻辑失败的时候,这个错误只会影响到消费者自身,而不会传递给消息的生产者,也就是应用程序可以按照原来的处理逻辑继续执行。
所以,这也就意味着在任何时候都可以对后端的服务器执行维护和发布操作。可以重启、添加或删除服务器,而不影响生产者的可用性,这样简化了部署和服务器管理的难度。
日志处理
日志处理是指将消息队列用在日志处理中,比如 Kafka 的应用,解决大量日志传输和缓冲的问题。日志采集客户端,负责日志数据采集,定时写受写入 Kafka 队列;Kafka 消息队列,负责日志数据的接收,存储和转发;日志处理应用,订阅并消费 kafka 队列中的日志数据。
消息队列技术对比
- ActiveMQ 是 Apache 出品的、采用 Java 语言编写的完全基于 JMS1.1 规范的面向消息的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。不过由于历史原因包袱太重,目前市场份额没有后面三种消息中间件多,其最新架构被命名为 Apollo,号称下一代 ActiveMQ,有兴趣的同学可行了解。
- RabbitMQ 是采用 Erlang 语言实现的 AMQP 协议的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ 发展到今天,被越来越多的人认可,这和它在可靠性、可用性、扩展性、功能丰富等方面的卓越表现是分不开的。主要特点是性能好,社区活跃,但是 RabbitMQ 用 Erlang 开发,我们的应用很少用 Erlang,所以不便于二次开发和维护。
- Kafka 是由 LinkedIn 公司采用 Scala 语言开发的一个分布式、多分区、多副本且基于 zookeeper 协调的分布式消息系统,现已捐献给 Apache 基金会。它是一种高吞吐量的分布式发布订阅消息系统,以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如 Cloudera、Apache Storm、Spark、Flink 等都支持与 Kafka 集成。
- RocketMQ 是阿里开源的消息中间件,目前在 Apache 孵化,使用纯 Java 开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ 思路起源于 Kafka,但并不是简单的复制,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog 分发等场景,支撑了阿里多次双十一活动。
- ZeroMQ 是基于 C 语言开发,号称史上最快的消息队列。ZeroMQ 是一个消息处理队列库,可在多线程、多内核和主机之间弹性伸缩,虽然大多数时候我们习惯将其归入消息队列家族之中,但是其和前面的几款有着本质的区别,ZeroMQ 本身就不是一个消息队列服务器,更像是一组底层网络通讯库,对原有的 Socket API 上加上一层封装而已。
总结:
- ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
- RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做 erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
- RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用 RocketMQ 挺好的
- kafka 最初设计时就是针对互联网的分布式、高可用应用场景而设计,所以其特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。
参考博文
[1]. 浅谈消息队列及常见的消息中间件 [2]. 消息中间件选型分析 [3]. 新手也能看懂,消息队列其实很简单 [4]. 10 分钟搞懂:95% 的程序员都拎不清的分布式消息队列中间件
代码语言:javascript复制source:https://morning-pro.github.io/archives/1c55560e.html