概述
Redis 5 新特性中,Streams 数据结构的引入,可以说它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作为消息队列使用时,得到更完善,更强大的原生支持,其中尤为明显的是持久化消息队列。
同时,stream 借鉴了 kafka 的消费组模型概念和设计,使消费消息处理上更加高效快速。本文就 Streams 数据结构中常用 API 进行分析。
Stream消息队列
- 消息 ID 的序列化生成
- 消息遍历
- 消息的阻塞和非阻塞读取
- 消息的分组消费
- 未完成消息的处理
- 消息队列监控
添加消息(生产消息)
Streams 添加数据使用 XADD 指令进行添加,消息中的数据以 K-V 键值对的形式进行操作。一条消息可以存在多个键值对,添加命令格式:
代码语言:javascript复制XADD key ID field string [field string ...]
其中 key 为 Streams 的名称,ID 为消息的唯一标志,不可重复,field string 就为键值对。下面我们就添加以 memberMessage 为名称的流,进行操作。
代码语言:javascript复制1127.0.0.1:6379> XADD memberMessage * user kang msg Hello
2"1553439850328-0"
3127.0.0.1:6379> XADD memberMessage * user zhong msg nihao
4"1553439858868-0"
上面添加案例中,ID 使用 * 号复制,这里代表着服务端自动生成 Id,添加后返回数据 "1553439858868-0"
这里自动生成的 Id 格式为-Id 是由两部分组成:
- millisecondsTime 为当前服务器时间毫秒时间戳。
- sequenceNumber 当前序列号,取值来源于当前毫秒内,生成消息的顺序,默认从 0 开始加 1 递增。
比如:1578238486193-3 表示在 1578238486193 毫秒的时间戳时,添加的第 4 条消息。
除了服务端自动生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下条件限制:
Id 中的前后部分必须为数字。最小 Id 为 0-1,不能为 0-0,但是 2-0,3-0 .... 是被允许的。添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。
否则,当不满足上述条件时,添加后会抛出异常:
代码语言:javascript复制(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
实际上,当添加一条消息时,会进行两部操作。第一步,先判断如果不存在 Streams,则创建 Streams 的名称,再添加消息到 Streams 中。即使添加消息时,由于 Id 异常,也可以在 Redis 中存在以当前 Streams 的名称。Streams 中 Id 也可作为指针使用,因为它是一个有序的标记。
生产中,如果这样使用添加消息,会存在一个问题,那就是消息数量太大时,会使服务宕机。这里 Streams 的设计初期也有考虑到这个问题,那就是可以指定 Streams 的容量。如果容量操作这个设定的值,就会对调旧的消息。在添加消息时,设置 MAXLEN 参数。
消息 ID 说明
XADD 生成的1553439850328-0
,就是 Redis 生成的消息 ID,由两部分组成: 时间戳 - 序号。时间戳是毫秒级单位,是生成消息的 Redis 服务器时间,它是个 64 位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个 64 位整型。
可以通过 multi 批处理,来验证序号的递增:
代码语言:javascript复制 1127.0.0.1:6379> MULTI
2OK
3127.0.0.1:6379> XADD memberMessage * msg one
4QUEUED
5127.0.0.1:6379> XADD memberMessage * msg two
6QUEUED
7127.0.0.1:6379> XADD memberMessage * msg three
8QUEUED
9127.0.0.1:6379> XADD memberMessage * msg four
10QUEUED
11127.0.0.1:6379> XADD memberMessage * msg five
12QUEUED
13127.0.0.1:6379> EXEC
141) "1553441006884-0"
152) "1553441006884-1"
163) "1553441006884-2"
174) "1553441006884-3"
185) "1553441006884-4"
由于一个 redis 命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。
为了保证消息是有序的,因此 Redis 生成的 ID 是_单调递增_有序的。由于 ID 中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis 的每个 Stream 类型数据都维护一个 latest_generated_id 属性,用于记录最后一个消息的 ID。若发现当前时间戳退后(小于 latest_generated_id 所记录的),则采用时间戳不变而序号递增的方案来作为新消息 ID(这也是序号为什么使用 int64 的原因,保证有足够多的的序号),从而保证 ID 的单调递增性质。
读取消息(消费消息)
在 Redis 的 PUB/SUB 中,我们是通过订阅来消费消息,在 Streams 数据结构中,同样也能实现同等功能,当没有新的消息时,可进行阻塞等待。不仅支持单独消费,而且还可以支持群组消费。
单独消费
单独消费使用 XREAD 指令。可以看到,下面命令中,STREAMS,key, 以及 ID 为必填项。ID 表示将要读取大于该 ID 的消息。当 ID 值使用 $ 赋予时,表示已存在消息的最大 Id 值。
代码语言:javascript复制XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
上面的 COUNT 参数用来指定读取的最大数量,与 XRANGE 的用法一样。
代码语言:javascript复制 1127.0.0.1:6379> XREAD streams memberMessage 0
21) 1) "memberMessage"
3 2) 1) 1) "1553439850328-0"
4 2) 1) "user"
5 2) "kang"
6 3) "msg"
7 4) "Hello"
8 2) 1) "1553439858868-0"
9 2) 1) "user"
10 2) "zhong"
11 3) "msg"
12 4) "nihao"
XREAD 支持很多参数,语法格式为:
- [COUNT count],用于限定获取的消息数量
- [BLOCK milliseconds],用于设置 XREAD 为阻塞模式,默认为非阻塞模式
- ID,用于设置由哪个消息 ID 开始读取。使用 0 表示从第一条消息开始。(本例中就是使用 0)此处需要注意,消息队列 ID 是单调递增的,所以通过设置起点,可以向后读取。
XRED 读消息时分为阻塞和非阻塞模式,使用 BLOCK 选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。
BLOCK 携带的参数为阻塞时间,单位为毫秒,如果在这个时间内没有新的消息消费,那么就会释放该阻塞。当这里的时间指定为 0 时,会一直阻塞,直到有新的消息来消费到。
阻塞模式用法:
代码语言:javascript复制1127.0.0.1:6379> XREAD block 1000 streams memberMessage $
2(nil)
3(1.07s)
使用 Block 模式,配合 $ 作为 ID,表示读取最新的消息,若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。
因此,典型的队列就是 XADD 配合 XREAD Block 完成。XADD 负责生成消息,XREAD 负责消费消息。
群组消费
群组消费的主要目的也就是为了分流消息给不同的客户端处理,以更高效的速率处理消息。为达到这一肝功能需求,我们需要做三件事:「创建群组,群组读取消息,向服务端确认消息以处理。」
群组操作
操作群组使用 XGROUP 指令:
代码语言:javascript复制XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
上面命令中,包含操作有:
- CREATE 创建消费组。
- SETID 修改下一个处理消息的 Id。
- DESTROY 销毁消费组。
- DELCONSUMER 删除消费组中指定的消费者。
群组读取消息
群组读取使用 XREADGROUP 指令,COUNT和BLOCK的使用类似 XREAD 的操作,只是多了个群组和消费者的指定:
代码语言:javascript复制XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
由于群组消费和单独消费类似,这里只进行个阻塞分析,这里 Id 也有个特殊值>,表示还未进行消费的消息。
Pending 等待列表
为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM
设计了 Pending
列表,用于记录读取但并未处理完毕的消息。命令XPENDIING
用来获消费组或消费内消费者的未处理完毕的消息。演示如下:
1127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
21) (integer) 5 # 5个已读取但未处理的消息
32) "1553585533795-0" # 起始ID
43) "1553585533795-4" # 结束ID
54) 1) 1) "consumerA" # 消费者A有3个
6 2) "3"
7 2) 1) "consumerB" # 消费者B有1个
8 2) "1"
9 3) 1) "consumerC" # 消费者C有1个
10 2) "1"
11
12127.0.0.1:6379> XPENDING mq mqGroup - 10 # 使用 start end count 选项可以获取详细信息
131) 1) "1553585533795-0" # 消息ID
14 2) "consumerA" # 消费者
15 3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE
16 4) (integer) 5 # 消息被读取了5次,delivery counter
172) 1) "1553585533795-1"
18 2) "consumerA"
19 3) (integer) 1654355
20 4) (integer) 4
21# 共5个,余下3个省略 ...
22
23127.0.0.1:6379> XPENDING mq mqGroup - 10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
241) 1) "1553585533795-0"
25 2) "consumerA"
26 3) (integer) 1641083
27 4) (integer) 5
28# 共3个,余下2个省略 ...
每个 Pending 的消息有 4 个属性:
- 消息 ID
- 所属消费者
- IDLE,已读取时长
- delivery counter,消息被读取次数
上面的结果我们可以看到,我们之前读取的消息,都被记录在 Pending 列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令 XACK
完成告知消息处理完成
消息ACK
消息消费后,为避免再次重复消费,这是需要向服务端发送 ACK,确保消息被消费后的标记。例如下列情况,我们上面我们将最新两条消息已进行了消费,但是当我们再次读取消息时,还是被读到:
代码语言:javascript复制 1127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
2(integer) 1
3
4127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
51) (integer) 4 # 已读取但未处理的消息已经变为4个
62) "1553585533795-1"
73) "1553585533795-4"
84) 1) 1) "consumerA" # 消费者A,还有2个消息处理
9 2) "2"
10 2) 1) "consumerB"
11 2) "1"
12 3) 1) "consumerC"
13 2) "1"
14127.0.0.1:6379>
有了这样一个 Pending 机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该 Pending 列表,就可以继续处理该消息了,保证消息的有序和不丢失。
此时还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者 Pending 的消息,转义给其他的消费者处理,就是消息转移。请继续。
消息转移
消息转移的操作时将某个消息转移到自己的 Pending 列表中。使用语法XCLAIM
来实现,需要设置组、转移的目标消费者和消息 ID,同时需要提供 IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:
1# 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
2127.0.0.1:6379> XPENDING mq mqGroup - 10
31) 1) "1553585533795-1"
4 2) "consumerA"
5 3) (integer) 15907787
6 4) (integer) 4
7
8# 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
9127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
101) 1) "1553585533795-1"
11 2) 1) "msg"
12 2) "2"
13
14# 消息1553585533795-1已经转移到消费者B的Pending中。
15127.0.0.1:6379> XPENDING mq mqGroup - 10
161) 1) "1553585533795-1"
17 2) "consumerB"
18 3) (integer) 84404 # 注意IDLE,被重置了
19 4) (integer) 5 # 注意,读取次数也累加了1次
以上代码,完成了一次消息转移。转移除了要指定 ID 外,还需要指定 IDLE,保证是长时间未处理的才被转移。被转移的消息的 IDLE 会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了 IDLE,则可以避免后面的转移不会成功,因为 IDLE 不满足条件。例如下面的连续两条转移,第二条不会成功。
代码语言:javascript复制1127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
2127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1
这就是消息转移。至此我们使用了一个 Pending 消息的 ID,所属消费者和 IDLE 的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。
消费者组模式详解
当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有 10 条消息,三个消费者都可以消费到这 10 条消息。
但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有 10 条消息,三个消费者分别消费其中的某些消息,比如消费者 A 消费消息 1、2、5、8,消费者 B 消费消息 4、9、10,而消费者 C 消费消息 3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。如下图所示:
消费者组模式的支持主要由两个命令实现:
- XGROUP,用于管理消费者组,提供创建组,销毁组,更新组起始消息 ID 等操作
- XREADGROUP,分组消费消息操作
进行演示,演示时使用 5 个消息,思路是:创建一个 Stream 消息队列,生产者生成 5 条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:
代码语言:javascript复制 1# 生产者生成10条消息
2127.0.0.1:6379> MULTI
3127.0.0.1:6379> XADD mq * msg 1 # 生成一个消息:msg 1
4127.0.0.1:6379> XADD mq * msg 2
5127.0.0.1:6379> XADD mq * msg 3
6127.0.0.1:6379> XADD mq * msg 4
7127.0.0.1:6379> XADD mq * msg 5
8127.0.0.1:6379> EXEC
9 1) "1553585533795-0"
10 2) "1553585533795-1"
11 3) "1553585533795-2"
12 4) "1553585533795-3"
13 5) "1553585533795-4"
14
15# 创建消费组 mqGroup
16127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 # 为消息队列 mq 创建消费组 mgGroup
17OK
18
19# 消费者A,消费第1条
20127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq > #消费组内消费者A,从消息队列mq中读取一个消息
211) 1) "mq"
22 2) 1) 1) "1553585533795-0"
23 2) 1) "msg"
24 2) "1"
25# 消费者A,消费第2条
26127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >
271) 1) "mq"
28 2) 1) 1) "1553585533795-1"
29 2) 1) "msg"
30 2) "2"
31# 消费者B,消费第3条
32127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq >
331) 1) "mq"
34 2) 1) 1) "1553585533795-2"
35 2) 1) "msg"
36 2) "3"
37# 消费者A,消费第4条
38127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA count 1 STREAMS mq >
391) 1) "mq"
40 2) 1) 1) "1553585533795-3"
41 2) 1) "msg"
42 2) "4"
43# 消费者C,消费第5条
44127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerC COUNT 1 STREAMS mq >
451) 1) "mq"
46 2) 1) 1) "1553585533795-4"
47 2) 1) "msg"
48 2) "5"
上面的例子中,三个在同一组 mpGroup 消费者 A、B、C 在消费消息时(消费者在消费时指定即可,不用预先创建),有着互斥原则,消费方案为,A->1, A->2, B->3, A->4, C->5。语法说明为:
XGROUP CREATE mq mqGroup 0
,用于在消息队列 mq 上创建消费组 mpGroup,最后一个参数 0,表示该组从第一条消息开始消费。(意义与 XREAD 的 0 一致)。除了支持CREATE
外,还支持SETID
设置起始 ID,DESTROY
销毁组,DELCONSUMER
删除组内消费者等操作。
XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >
,用于组mqGroup
内消费者consumerA
在队列mq
中消费,参数>
表示未被组内消费的起始消息,参数count 1
表示获取一条。语法与XREAD
基本一致,不过是增加了组的概念。
可以进行组内消费的基本原理是,STREAM 类型会为每个组记录一个最后处理(交付)的消息 ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。
以上就是消费组的基础操作。除此之外,消费组消费时,还有一个必须要考虑的问题,就是若某个消费者,消费了某条消息,但是并没有处理成功时(例如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了。下面继续讨论解决方案。