参考极客时间《消息队列高手课》,作为学习笔记
1.为什么使用消息队列
消息队列作为消息中间件主要起缓存作用,本质是解决上下游生产速度不一致的问题
日常开发中的常用场景有:异步处理、流量控制、服务解耦
1)异步处理
异步处理也可以理解为并发处理,即多个任务同时进行,不需要等待,与之相对的是串行处理,需要一步一步等待执行
比如秒杀系统需要解决的核心问题是,如何利用有限的服务器资源,尽可能多地处理短时间内的海量请求,简短步骤如下:
如果没有任何优化的话,就会串行执行上述步骤,肯定无法处理海量请求
而实际上只有风控和库存是需要及时返回给用户的,后面三步都可以放入消息队列异步消费,这样就可以把省出来的服务器资源用于处理后面的步骤,好处是减少等待时间,提升整体性能
2)流量控制
流量控制用于防止过多的请求压垮系统,这就要求程序具有健壮性,它应该可以在海量的请求下,还能在自身能力范围内尽可能多地处理请求,拒绝处理不了的请求并且保证自身运行正常,而现实中很多程序并没有那么“健壮”,直接拒绝请求返回错误对于用户来说也是不好的
方法:使用消息队列隔离网关和后端服务,以达到流量控制和保护后端服务的目的
当大量请求来到网关之后,不会马上到服务器,而是先堆积在消息队列中,后端服务按照自己的最大处理能力,从消息队列中消费请求进行处理,具有削峰填谷的作用
代价:调用链变长导致总体的响应时延变长,系统复杂性增加
还有一种方式可以更好的控制流量:令牌桶
令牌桶控制流量的原理:单位时间内只发放固定数量的令牌到令牌桶中,规定服务在处理请求之前必须先从令牌桶中拿出一个令牌,如果令牌桶中没有令牌,则拒绝请求。这样就保证单位时间内,能处理的请求不超过发放令牌的数量,起到了流量控制的作用
3)服务解耦
服务解耦即降低各应用之间的依赖关系
比如订单系统,一张订单对应很多个下游系统,当业务增加时,下游系统也会增加,这样订单开发团队就要花费很多时间挨个接口的去对接,任何一个接口的改变都需要订单模块重新进行一次上线,费时费力
这时可以用消息队列的方式去处理,只要订单哪里变更了,就把变更信息发送到消息队列,所有下游系统都订阅这个变更消息,获得一份实时完整的订单数据,这样就可以实现解耦,节省开发人员精力
4)其他场景
除了上面三种常用的情况,还有许多其他场景:日志处理、消息通讯、数据同步、消息广播 ……
2.如何选择消息队列
1)RabbitMQ
Erlang 语言编写的,少数几个支持 AMQP 协议的消息队列之一,RabbitMQ 是一个相当轻量级的消息队列,非常容易部署和使用。RabbitMQ 一个比较有特色的功能是支持非常灵活的路由配置,和其他消息队列不同的是,它在生产者(Producer)和队列(Queue)之间增加了一个 Exchange 模块,可以理解为交换机。
存在的问题:
RabbitMQ 对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降,性能在这几个消息队列中偏差,每秒处理几万到十几万条数据,Erlang语言偏小众,学习难度大
2)RocketMQ
RocketMQ使用Java语言开发,阿里开源产品,是一个比较全面的消息队列, 性能比 RabbitMQ 要高一个数量级,每秒钟大概能处理几十万条消息,且消息具有可靠性、实时性、持久性
RocketMQ 对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,如果应用场景很在意响应时延,那应该选择使用 RocketMQ
有非常活跃的中文社区,大多数问题都可以找到中文答案
缺点是作为国产消息队列,和周边生态系统的集成和兼容程度要略逊一筹
3)KafKa
Kafka 最早是由 LinkedIn 开发,当下的Kafka 已经发展为一个非常成熟的消息队列产品,无论在数据可靠性、稳定性和功能特性等 方面都可以满足绝大多数场景的需求
Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域,几乎所有的相关开源软件系统都会优先支持 Kafka KafKa具有高吞吐量,和RocketMQ同一量级,但是它的异步收发消息的性能是最好的
这种异步批量的设计带来的问题是,它的同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送,当业务场景每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高,所以,Kafka不太适合在线业务场景
4)ActiveMQ、ZeroMQ、Pulsar
ActiveMQ和ZeroMQ用的不多
Pulsar 是一个新兴的开源消息队列产品,最早是由 Yahoo 开发,可以持续关注
5)选择总结
选择中间件的考量维度:可靠性,性能,功能,可运维行,可拓展性,是否开源及社区活跃度
- 如果对消息队列没有很高的要求,只需开箱即用,可以选择RabbitMQ
- 如果是处理在线业务,比如在交易系统中用消息队列传递订单,那 RocketMQ 的低延迟和金融级的稳定性是更好的选择
- 如果要处理海量的消息,像收集日志、监控信息、前端的埋点、应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 是最适合的选择
3.主题和队列的关系
1)队列模型
队列模型是早期的消息队列使用的数据结构,具有队列先进先出的特性
消息只能一个一个的消费,消费者之间是竞争关系,当有多个系统要同时消费消息时,只能等待,虽然可以创建多个相同队列供不同系统消费,但是这比较浪费资源,且不符合解耦的思想,于是演化出了发布-订阅模型
2)发布-订阅模型
最大特点就是主题可以被复用,被消费多次,不需要排队等候,每份订阅中,订阅者都可以接收到主题的所有消息
3)RabbitMQ 的消息模型
RabbitMQ的消息模型算是一个例外,依然使用队列模型
同一份消息如果需要被多个消费者来消费,Exchange将消息发送到多个队列,也算变相实现发布-订阅模型
4)RocketMQ 的消息模型
可见一个主题可以分布在多个broker上,每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费,需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的
订阅者的概念是通过消费组(Consumer Group)来体现的,每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费
消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息,如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息
每个队列里还会维护一个消费位置(consumer offset),用来确定消息消费到哪里了
3)KafKa 的消息模型
KafKa 的消息模型和RocketMQ 的消息模型基本一致,不过RocketMQ中对应的队列(queue)在KafKa中对应的是分区(Partition),不过功能是一样的
4.消息队列中的分布式事务
消息队列中的“事务”,主要解决的是消息生产者和消息消费者的数据一致性问题,即要么都操作成功,要么都操作失败,不允许一个成功而另一个失败的情况出现
1)分布式事务
大部分传统的单体关系型数据库都完整的实现了 ACID,但是对于分布式系统来说,严格的实现 ACID 这四个特性几乎是不可能的,或者说实现的代价太大,因为分布式系统还要考虑到可用性和性能问题,所以目前所说的分布式事务,更多情况下,是在分布式系统中事务的不完整实现,在不同的应用场景中,有不同的实现,目的都是通过一些妥协来解决实际问题
实际应用中比较常见的分布式事务有:2PC(两阶段提交)、3PC(三阶段提交)、TCC(补偿事务)、消息事务、本地消息表、最大努力通知等,详细的内容不在本文讨论
事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景
2)消息队列如何实现分布式事务
事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能
这里的半消息并不是说消息不完整,相反它是完整的消息,只是说在事务提交之前,这个消息是不可见的
半消息发送成功后,就可以开始执行本地事务了,然后根据本地事务的执行结果决定提交或者回滚事务消息
这里有一个问题,即如果在第四步提交事务消息时失败了怎么办,KafKa直接抛出异常,而RocketMQ采用事务反查的机制来解决事务消息提交失败的问题
3)RocketMQ 中的分布式事务实现
如果在提交或者回滚事务消息时发生网络异常,RocketMQ的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务
这个反查本地事务的实现,并不依赖消息的发送方,也就是订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ 依然可以通过其他订单服务的节点来执行反查,确保事务的完整性
5.如何确保消息不丢失
1)检测消息丢失的方法
① 在 IT 基础设施比较完善的情况下,可以使用分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息
② 利用消息队列的有序性来验证是否有消息丢失,原理是在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性,如果消息丢失了,可以通过缺失的序号来确定丢失的是哪条消息
需要注意的是Kafka 和 RocketMQ不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以在发消息的时候必须要指定分区,并且在每个分区单独检测消息序号的连续性
2)确保消息可靠传递
① 生产阶段:生产阶段通过请求确认机制实现消息的可靠传递,只要Producer收到了Broker的确认就说明消息到达了
② 存储阶段:存储阶段只要Broker出现了故障就会发生丢失消息的情况,可以将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘,在将消息写入磁盘后才返回确认给Producer
③ 消费阶段:消费阶段也是采用请求确认机制保证消息的可靠传递,这里需要注意,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认
6.如何处理重复消息
出现重复消息的场景:比如在传递消息的过程中,消息正常传递过去了,但没有收到对应的确认Ack,这时候就会发送一个重复的消息,导致消息重读
在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,由低到高分别是:
- At most once:至多一次。消息在传递时,最多会被送达一次,允许丢消息,一般都是一些对消息可靠性要求不太高的监控场景使用
- At least once:至少一次。消息在传递时,至少会被送达一次,不允许丢消息,但是允许有少量重复消息出现
- Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级
现在常用的绝大部分消息队列提供的服务质量都是 At least once,因为要在性能之间取舍,所以消息队列很难保证消息不重复
1)幂等性解决消息重复问题
幂等在数学上的概念是:如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性
在计算机领域的概念是:其任意多次执行所产生的影响均与一次执行的影响相同
① 利用数据库的唯一约束实现幂等
比如在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合 起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录
不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等
② 为更新的数据设置前置条件
给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,比如:if 余额 == 100 元,则更新数据,这样就具有了幂等性。更加通用的方法是,给数据增加一个版本号属性,每次更新数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 1,一样可以实现幂等更新
③ 记录并检查操作
这种方式通用性最强,也称为“Token 机制或者 GUID(全局唯一 ID)机制“,即在执行数据更新操作之前,先检查一下是否执行过这个更新操作
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。”检查消费状态、更新数据、设置消费状态”这三个操作必须作为一组操作保证原子性,才能真正实现幂等
7.消息积压如何处理
消息积压的直接原因,一般是系统中的某个部分出现了性能问题,来不及处理上游发送的消息,才会导致消息积压
对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。主流消息队列的单个节点,消息收发的性能可以达到每秒钟处理几万至几十万条消息的水平,所以对于性能的优化,主要体现在生产者和消费者一收一发这两部分的业务逻辑中,对于消息队列本身的性能,使用者不需要太关注,因为一般业务逻辑系统单个结点每秒最多处理几千个请求,所以怎么在消息的收发两端让业务代码和消息队列配合是程序员更需要关注的点
1)发送端性能优化
发送端业务代码的处理性能,实际上和消息队列的关系不大,因为发送执行在消息队列之前
更需要关注的是如何优化代码性能,一般发送端要么是批量发送,要么是并发发送,目的都是提升发送性能
比如在线业务更加注重的是实时性,要求响应时延必须低,这种情况可以选择并发的方式来提升发送性能
如果是一个离线的系统,因为不会太注重时延,而是更关注吞吐量,所以适合批量发送消息
2)消费端性能优化
使用消息队列的大部分性能问题都出在消费端,因为如果消费速度跟不上生产速度,就会造成消息堆积
解决办法有:
- 优化消费业务逻辑
- 增加消费端的我并发数(水平扩容),注意,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。原因是对于消费者来说,在每个分区上实际上只能支持单线程消费
有一个误区:
- 即在消费端接收消息时,为了避免消息积压,有人会先把消息放在内存,然后启动很多的业务线程去消费内存里的消息,想解决单个 Consumer 不能并行消费的问题,但其实这个方法是错误的,因为如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消 息就会丢失
3)消息积压该如何处理
导致消息积压主要原因有两种:要么是发送变快了,要么是消费变慢了
常用的解决方法:
- 消费端扩容,扩大消费端实例数来提升总体的消费能力
- 系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量
8.参考链接
https://www.cnblogs.com/weifeng1463/p/12889300.html