基于Redis实现特殊的消息队列

2022-11-29 13:43:02 浏览数 (2)

特殊场景的消息队列

消息队列使用比较多的产品kafka,在各个领域都发挥了很大的作用,但是在以下的几种场景是无法满足需求。

场景
  1. 消息重复概率比较高时,需要对重复消息进行合并处理避免浪费有限的资源,减少延迟
  2. 需要根据业务自定义的优先级进行消息处理,高优先级的消息比低优先级的消息先处理
  3. 消息需要定时消费场景,消息只有在设定的消费时间到了之后立马被消费

RMQ(Redis message queue,RMQ)

功能:

RMQ设计为一个第三方库,可以帮助用户基于Redis快速实现消息队列的功能,RMQ消息队列具有消息合并、区分优先级、支持定时消息等特性。RMQ消息队列可以用于异步解耦、削峰填谷,支持千万级别的数据堆积。

RMQ消息队列目前支持三种类型的消息,分别是:RangeMergeMessage(区间重复合并消息)、PriorityMessage(优先级消息)、FixedTimeMessage(任意定时消息)

区间重复合并消息

RangeMergeMessage支持区间重复消息合并,发送消息时需要设置时间区间,消息延迟该时间长度后被消费,在该时间区间内如果发送重复的消息,重复消息将会被合并。如果消息在Redis服务端发生堆积,重复到来的消息依然会被合并处理。该类型消息适用于消息重复率较高并且希望消息合并并处理的场景,对重复消息进行合并可以减少下游消费系统的压力,减少不必要的资源消耗,将有限的资源最大化的利用,提升消费效率。

优先级消息

PriorityMessage支持给消息设置任意等级的优先级,优先级高的消息会被优先消费,相同的优先级的消息被随机消费。如果消息在Redis服务端发生堆积,重复的消息将被合并处理,合并后的消息优先级等于最后存储的消息的优先级。该类型消息适用于希望重复消息合并处理并且需要设置优先级的场景,下游消费者资源有限时候,合并重复消息并且优先级高的消息将可以合理利用有限的资源。

任意定时消息

FixedTimeMessage支持给消息设置任意消费时间,只有消费时间到了之后消息才被消费,消费时间可以精确到秒。消息到期后没有及时被消费时,消费者将按照时间由远到近进行消费。如果消息在Redis服务端发送堆积,重复的消息将被合并处理,合并后消息的消费时间等于最后存储的消息的消费时间,该类型消息适用于希望重复消息合并处理且需要定时消费的场景,定时消息应用场景非常丰富,比如定时打标去标、活动结束后清理动作、定时超时关闭等。

并发消息控制

使用传统消息中间件进行集群消费的时候,为了避免并发处理同一元数据导致不一致的问题,通常需要对元数据加分布式锁,频繁的锁冲突会导致消费效率低下,加分布式锁的最终目的其实就是保障属于同一元数据的消息被串行消费。加分布式锁并不是最好的方案,最好的方案是从根本上解决并发问题,让属于同一元数据的消息串行消费。RMQ 消息队列具有并发消费控制能力,属于同一元数据的消息只会被分配给全局唯 一一个线程进行消费,因此属于同一元数据的消息将被串行消费。使用方如果需要 该能力,除了需要提供 Redis,还需要提供 ZooKeeper。

重试次数控制

RMQ 消息队列支持失败重试消费 16 次,业务返回消费失败后,消息会被回滚并等待重试消费,重试 16 次后消息进入死信队列,消息不再被消费,除非人工干预。

RMQ实现的原理

RMQ消息队列由三部分组成,分别是ZooKeeper、RMQ二方库、Redis

Zookeeper

Zookeeper负责维护集群worker的信息,将topic的所有slot分配给全局的woker;

Redis

Redis负责存储消息,采集Sorted Set结构存储,Store Queue是消息队列,Prepare Queue是采用二阶段消费的方式正在消费消息队列中的信息,Dead Queue是死的队列信息

RMQ
  1. RmqClient负责RMQ的启动工作,包括上报TopicDef、Worker给Zookeeper,分配给Slot的Worker扫描业务定义的MessageListerBean
  2. Producer负责根据不用消息类型将消息按照指定的方式存储到Redis中
  3. Consumer负责根据不用消息类型按照指定方式从Redis弹出的消息并调用业务的MessageListener。
消息的存储
Topic的设计原理

Topic的定义有三个部分组成,topic表示主题名称、slotAmount表示消息存储划分的槽的数量,topicType表示消息的类型

主题名称时一个Topic的唯一标示,相同主题名称的Topic的SlotAmount和topicType一定是一样的消息存储采用Redis的Sorted Set结构表示,为了支持大量的消息堆积的情况,需要把消息分散存储到很多个槽中,SlotAmount表示该Topic消息存储共同使用的槽数量,槽数量一定需要是2的n次方的幂。在消息存储的时候,采用制定数据或者消息体的哈希求余数得到槽的位置

SoreQueue的设计原理

上图中的topic划分了8个槽为,编号是0--7,如果发送指定了消息的slotBasis,则计算slotBasis的CRC32的值,CRC32值对槽数量进行取模得到对应的槽序号,SlotKey设计为#{topic}_#{index}也就是Redis的键,其中#{}表示占位符的处理。

发送方需要保证相同内容的消息的SlotBasis相同,如果没有制定SlotBasis则采用内容计算SlotKey,这样内容相同的消息就会落在同一个Sorted Set里面,所以内容相同的消息会进行合并的处理。

Redis的Sorted Set中的数据按照分数排序,实现不同类型的消息的关键就在于如何利用分数,如何增加消息到Sorted Set、如何从Sorted Set中获取数据消息。

  1. 优先级消息将优先级作为分数,消费时每次弹出分数最大的消息;
  2. 任意定时消息将时间戳作为分数,消费时每次弹出分数大于当前时间戳的一个 消息;
  3. 区间重复合并消息将时间戳作为分数,添加消息时将(当前时间戳 时间区间)作为分数,消费时每次弹出分数大于当前时间戳的一个消息

0 人点赞