Redis生产者与消费者

2022-12-01 13:32:41 浏览数 (3)

生产者

生产者的任务就是将消息添加到Redis的Sorted Set中。首先,需要计算出消息添加到Redis的SlotKey,如果发送方指定了消息的slotBasis,则计算slotBasis的CRC32值,CRC32值对槽数量进行取模得到槽序号,SlotKey设计为#{topic}_#{index},其中#{}表示占位符。然后,不同类型的消息有不同的添加方式,因此分布式讲述的三种消息类型的添加过程

区间重复合并消息

发送该消息时需要设置timeRange,timeRange必须大于0,单位为毫秒,表示消息将延迟timeRange毫秒后被消费,期间到来的重复消息将被合并,合并后的消息依然维持原来的消费时间,因此在存储该类型消息的时候,采用(当前时间戳 timeRange)作为分数,添加消息采用Lua脚本执行,保证操作的原子性,Lua脚本首先采用zscore命令检查消息是否已经存在,如果已经存在则直接返回,如果不存在则执行zadd命令添加

优先级消息

发送该消息时需要设置priority,必须大于16,表示消息的优先级,数值越大表示优先级越高。因此在存储类型消息的时候,采用priority作为分数,采用zadd命令直接添加。

任意定时消息

发送该消息时需要设置fixedTime,fixedTime必须大于当前时间,表示消费时间戳,当前时间大于消费时间戳的时候,消息才会被消费,因此在存储该类型消息的时候,采用fixedTime作为分数,采用命令zadd直接添加。

消费者

二阶段消费方式
三种消费方式

一般消息队列存在三种消费模式,分别是:最多消费一次、至少消费一次、只能消费一次。

  1. 最多消费一次模式消息可能丢失,一般不怎么使用
  2. 至少消费一次模式消息不会丢失,但是可能存在重复消费,比较常用
  3. 只能消费一次模式消息被精确只能消费一次,实现比较困难,一般需要业务记录幂等ID来实现

RMQ实现了至少消费一次的模式,实现的原理如下:

至少消费一次模式实现的难点

从简单的消费模式,最多消费一次说起,消费者端只需要从消息队列服务中取出消息就行,即执行Redis的Zpopmax命令,不论消费者是否能够收到消息并成功消息,消息队列服务都认为消息消费成功

最多一次消费模式导致消息丢失的因素可能有

  1. 网络丢包导致消费者没有接收到消息
  2. 消费者接收到消息但在消费的时候宕机了
  3. 消费者接收到消息但是消费失败了

针对消费失败导致消息丢失的情况比较容易解决,只是需要把消费失败的消息重新放入消息队列服务就行,但是网络丢包和消费系统异常的消息丢失问题不好解决。性能差的解决方案是:不把元素从有序集合种pop出来,先查询优先级最高的元素,进行消费,在删除消费成功的元素,这样的缺点是消息服务队列变成了同步阻塞队列,性能会很差。

至少消费一次的模式实现

至少消费一次的问题比较类似与银行转账的问题,A向B账户转账100元,如何保障A账户扣减了100元同时B账户增加了100元,可以通过二阶段提交的处理思想。

  1. 第一个准备阶段,A、B分别进行资源冻结并持久化undo和redo日志,A、B分别告诉协调者已经准备好了
  2. 第二个提交阶段,协调者告诉A、B进行提交,A、B分别提交事务。

RMQ基于二阶段提交的思想来实现至少消费一次的模式。RMQ存储设计种PrepareQueue的作用就是用来冻结资源并记录事务日志,消费端即使参与者也是协调者。

  1. 第一个准备阶段,消费者通过执行脚本从StoreQueue种Pop消息存储到PrepareQueue,同时消息传输到消费者端,消费者端消费消息。
  2. 第二个提交阶段,消费者端根据消息结果是否成功协调消息队列服务是提交还是回滚,如果消费成功则提交事务,该消息从PrepareQueue中删除,如果消费失败则回滚事务,消费者将消息从PrepareQueue中移动到StoreQueue,如果因为各种异常导致PrepareQueue中消息超时,超时后将自动执行回滚操作。
实现方案的异常情况分析

网络丢包导致消费者没有接收到消息,这时消息已经记录到PrepareQueue,如果到了超时时间,消息被回滚到StoreQueue,等待下次被消费,消息不丢失。

消费者接收到了消息,但是消费者还没来得及消费完成系统就宕机了,消息消费超时到了后,消息会被重新放入 StoreQueue,等待下次被消费,消息不丢失。基于 Redis 实现特殊的消息队列

消费者接收到了消息并消费成功,消费者端在协调事务提交的时候宕机了,消 息消费超时到了后,消息会被重新放入 StoreQueue,等待下次被消费,消息被 重复消费。

消费者接收到了消息但消费失败,消费者端在协调事务提交的时候宕机了,消 息消费超时到了后,消息会被重新放入 StoreQueue,等待下次被消费,消息不 丢失消费者接收到了消息并消费成功,但是由于 fullgc 等原因使消费时间太长, PrepareQueue 中的消息由于超时已经回滚到 StoreQueue,等待下次被消费,消息被重复消费.

重试次数控制的实现

采用二阶段消费方式,需要将消息在 StoreQueue 和 PrepareQueue 之间移动,如 何实现重试次数控制呢,其关键在 StoreQueue 和 PrepareQueue 的分数设计。

PrepareQueue 的分数需要与时间相关,正常情况下,消费者不管消费失败还是消 费成功,都会从 PrepareQueue 删除消息,当消费者系统发生异常或者宕机的时候, 消息就无法从 PrepareQueue 中删除,我们也不知道消费者是否消费成功,为保障 消息至少被消费一次,我们需要做到超时回滚,因此分数需要与消费时间相关。

当 PrepareQueue 中的消息发生超时的时候,将消息从 PrepareQueue 移动到 StoreQueue。因此PrepareQueue 的分数设计为:秒级时间戳*1000 重试次数。不 同类型的消息首次存储到 StoreQueue 中的分数表示的含义不尽相同,区间重复合 并消息和任意定时消息存储时的分数表示消费时间戳,优先级消息存储时的分数表 示优先级。

如果消息消费失败,消息从 PrepareQueue 回滚到 StoreQueue,所有类型的消息 存储时的分数都表示剩余重试次数,剩余重试次数从 16 次不断降低最后为 0,消息 进入死信队列。消息在 StoreQueue 和 PrepareQueue 之间移动流程如下:

0 人点赞