在RedisV5.0之前, 如果想实现队列功能, 只能用list或者pub/sub实现, 但它们都有自己的缺点.
使用list方式, 缺少ack确认, 不能做广播, 不能分组消费;
使用pub/sub方式, 消息发布后, 客户端不能立刻接收就会丢失消息;
在RedisV5.0的时候, 提供了Stream类型实现队列功能. 其中包括: 生成消息ID, 消息确认, 分组消费等功能.
Stream有一个消息链表, 将所有加入的消息都串联起来, 每个消息都有一个唯一的ID和对应的内容. 消息是持久化的, Redis重启时, 消息不会丢失.
1
XADD
向指定队列中添加信息.
代码语言:javascript复制xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
key 队列名称, 如果不存在就创建新队列;
[MAXLEN|MINID [=|~] threshold [LIMIT count]] 队列的长度,最小ID等信息;
*|ID 消息ID,可以自定义或Redis 自动生成; 自定义生成时, 需要保证单调递增; 使用符号"*"表示由Redis生成; 按时间戳-序号规则生成, 其中时间戳是毫秒级的Redis服务器时间;
field value 消息内容, 1个或多个KV键值对;
代码语言:javascript复制127.0.0.1:6379> xadd mystream 1 k1 v1 k2 v2
"1-0"
127.0.0.1:6379> xadd mystream * k3 v3
"1622699291143-0"
# 批量添加
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd mystream * k4 v4
QUEUED
127.0.0.1:6379(TX)> xadd mystream * k5 v5
QUEUED
127.0.0.1:6379(TX)> exec
1) "1622700074168-0"
2) "1622700074168-1"
2
XRANGE 与 XREVRANGE
xrange按ID范围读取记录, 从数据 ID的小老到大的顺序读取; xrevrange则是反向读取, 按 ID从大到小的顺序读取.
代码语言:javascript复制xrange key start end [COUNT count]
127.0.0.1:6379> xrange mystream - count 4
1) 1) "1-0"
2) 1) "k1"
2) "v1"
3) "k2"
4) "v2"
2) 1) "1622699291143-0"
2) 1) "k3"
2) "v3"
3) 1) "1622700074168-0"
2) 1) "k4"
2) "v4"
4) 1) "1622700074168-1"
2) 1) "k5"
2) "v5"
127.0.0.1:6379>
127.0.0.1:6379> XREVRANGE mystream - count 4
1) 1) "1622700074168-1"
2) 1) "k5"
2) "v5"
2) 1) "1622700074168-0"
2) 1) "k4"
2) "v4"
3) 1) "1622699291143-0"
2) 1) "k3"
2) "v3"
4) 1) "1-0"
2) 1) "k1"
2) "v1"
3) "k2"
4) "v2"
127.0.0.1:6379>
3
XREAD
可以按阻塞和非阻塞两种方式读取消息信息.
代码语言:javascript复制XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
[COUNT count] 用于限定获取的消息数量
[BLOCK milliseconds] 阻塞模式下的超时时间, 默认为非阻塞模式;
ID 指定信息ID开始读取; 0表示从第一条消息开始; 阻塞模式下, 表示最新的消息ID; 在非阻塞模式下无意义.
代码语言:javascript复制127.0.0.1:6379> xread count 1 block 100 streams mystream 0
1) 1) "mystream"
2) 1) 1) "1-0"
2) 1) "k1"
2) "v1"
3) "k2"
4) "v2"
127.0.0.1:6379> xread count 1 block 100 streams mystream $
(nil)
4
XLEN
队列数据个数
代码语言:javascript复制127.0.0.1:6379> xlen mystream
(integer) 4
5
XTRIM
限制队列长度;
~ 表示可以不十分精确的限制队列N条, 可以多出一些, 但是不能少. 可以减轻 Redis服务准确计算队列长度的压力;
代码语言:javascript复制127.0.0.1:6379> XTRIM mystream MAXLEN ~ 1000
6
XDEL
删除指定ID消息
代码语言:javascript复制127.0.0.1:6379> xadd mystream * k6 v6
"1622702539939-0"
127.0.0.1:6379> xrange mystream 1622700074168-1
1) 1) "1622700074168-1"
2) 1) "k5"
2) "v5"
2) 1) "1622702539939-0"
2) 1) "k6"
2) "v6"
127.0.0.1:6379> xdel mystream 1622702539939-0
(integer) 1
127.0.0.1:6379> xrange mystream 1622700074168-1
1) 1) "1622700074168-1"
2) 1) "k5"
2) "v5"
127.0.0.1:6379>
7
XGROUP
队列消费分组的创建和删除, 以及消费者的创建和销毁.
Stream可以挂多个消费组, 各消费组之间互相独立,不受影响, 每个消费组都会有个游标last_delivered_id标识当前消费组的消费位置.
同一消费组中, 可以有多个消费者, 协同消费消息, 每读取一条消息, last_delivered_id都会下移一位, 同时会记录在PEL(pending_ids)中, 直到消息被ACK确认后, 才会从PEL中移除.
代码语言:javascript复制XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupnanme]
# 创建消费分组
代码语言:javascript复制127.0.0.1:6379> XGROUP create mystream mygroup 0
OK
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
# 设置开始处理消息ID
代码语言:javascript复制127.0.0.1:6379> XGROUP setid mystream mygroup 1622699291143-0
OK
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1622699291143-0"
127.0.0.1:6379>
删除消费分组
代码语言:javascript复制127.0.0.1:6379> xgroup destroy mystream mygroup
(integer) 1
127.0.0.1:6379> xinfo groups mystream
(empty array)
127.0.0.1:6379>
创建指定分组中消费者
代码语言:javascript复制127.0.0.1:6379> XGROUP CREATECONSUMER mystream mygroup myconsumer
(integer) 1
127.0.0.1:6379> xinfo consumers mystream mygroup
1) 1) "name"
2) "myconsumer"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 41127
2) 1) "name"
2) "myconsumer1"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 1974
删除指定分组中消费者
代码语言:javascript复制127.0.0.1:6379> XGROUP delconsumer mystream mygroup myconsumer1
(integer) 0
127.0.0.1:6379> xinfo consumers mystream mygroup
1) 1) "name"
2) "myconsumer"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 110278
127.0.0.1:6379>
8
XREADGROUP
指定消费组内的消费者读取消息
代码语言:javascript复制XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
[COUNT count] 每次获取消息的数量;
[BLOCK milliseconds] 阻塞模式和超时时间;
[NOACK] 不需要确认消息, 适用于不怎么重要的可以丢失的消息;
key [key ...] 指定Stream key;
ID [ID ...] 指定的消息 ID; > 指定读取所有未消费的消息, 其他值指定被挂起的消息;
查看当前Stream 消费组信息, 确认最后消费位置
代码语言:javascript复制127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1622699291143-0"
查看Stream 数据
代码语言:javascript复制127.0.0.1:6379> xrange mystream -
1) 1) "1-0"
2) 1) "k1"
2) "v1"
3) "k2"
4) "v2"
2) 1) "1622699291143-0"
2) 1) "k3"
2) "v3"
3) 1) "1622700074168-0"
2) 1) "k4"
2) "v4"
4) 1) "1622700074168-1"
2) 1) "k5"
2) "v5"
消费Stream队列中下一条消息
代码语言:javascript复制127.0.0.1:6379> XREADGROUP group mygroup myconsumer count 1 streams mystream >
1) 1) "mystream"
2) 1) 1) "1622700074168-0"
2) 1) "k4"
2) "v4"
127.0.0.1:6379>
9
XPENDING
查看未进行确认的数据
代码语言:javascript复制127.0.0.1:6379> xpending mystream mygroup - 10
1) 1) "1622700074168-0"
2) "myconsumer"
3) (integer) 25605849
4) (integer) 1
127.0.0.1:6379>
10
XACK
确认当前消费分组处理完毕该消息
代码语言:javascript复制127.0.0.1:6379> xack mystream mygroup 1622700074168-0
(integer) 1
127.0.0.1:6379> xpending mystream mygroup - 10
(empty array)
127.0.0.1:6379>
11
XCLAIM
可能因为服务异常等原因, 消息长时间未能得到消费或者确认, 可以通过 xclaim 命令读取指定 ID 或者空闲时间的数据, 重新消费数据, 并再次确认(xack)消息;
代码语言:javascript复制XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]
ID [ID ...] 消息的 ID;
[IDLE ms] 设置消息的空闲时间, 如果不提供, 默认为 0;
[TIME ms-unix-time] 和IDLE相同, unix 时间戳;
RETRYCOUNT 设置重试次数, 通常 xclaim 不会改变这个值, 它通常用于 xpending 命令, 用来发现一些长时间未被处理的消息;
FORCE 在 PEL 中创建待处理消息, 即使指定的 ID 尚未分配给客户端的PEL;
JUSTID 只返回认领的消息 ID 数组, 不返回实际消息;
代码语言:javascript复制127.0.0.1:6379> XREADGROUP group mygroup myconsumer count 1 streams mystream >
1) 1) "mystream"
2) 1) 1) "1622700074168-1"
2) 1) "k5"
2) "v5"
127.0.0.1:6379> xpending mystream mygroup - 10
1) 1) "1622700074168-1"
2) "myconsumer"
3) (integer) 3256
4) (integer) 1
127.0.0.1:6379> XCLAIM mystream mygroup myconsumer 10 1622700074168-1
1) 1) "1622700074168-1"
2) 1) "k5"
2) "v5"
12
XINFO
Stream也提供了查看队列元数据的命令
代码语言:javascript复制127.0.0.1:6379> xinfo help
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3) Show consumers of <groupname>.
4) GROUPS <key>
5) Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7) Show information about the stream.
8) HELP
9) Prints this help.
查看队列基本信息, 包括消费分组, 以及对应的最后消费数据 ID 信息
代码语言:javascript复制127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
2) "mygroup1"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
查看指定消费分组的消费者
代码语言:javascript复制127.0.0.1:6379> xinfo consumers mystream mygroup1
1) 1) "name"
2) "myconsumer1"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 19009
查看队列元信息
代码语言:javascript复制127.0.0.1:6379> xinfo stream mystream
1) "length"
2) (integer) 1
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1622603679677-0"
9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1622603679677-0"
2) 1) "k1"
2) "v1"
13) "last-entry"
14) 1) "1622603679677-0"
2) 1) "k1"
2) "v1"
总结
以上, 就是 RedisV5.0版本的Stream的基本功能了.