Redis消息队列 | Stream

2022-06-27 15:18:12 浏览数 (1)

在RedisV5.0之前, 如果想实现队列功能, 只能用list或者pub/sub实现, 但它们都有自己的缺点.

使用list方式, 缺少ack确认, 不能做广播, 不能分组消费;

使用pub/sub方式, 消息发布后, 客户端不能立刻接收就会丢失消息;

在RedisV5.0的时候, 提供了Stream类型实现队列功能. 其中包括: 生成消息ID, 消息确认, 分组消费等功能.

Stream有一个消息链表, 将所有加入的消息都串联起来, 每个消息都有一个唯一的ID和对应的内容. 消息是持久化的, Redis重启时, 消息不会丢失.

1

XADD

向指定队列中添加信息.

代码语言:javascript复制
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

key 队列名称, 如果不存在就创建新队列;

[MAXLEN|MINID [=|~] threshold [LIMIT count]] 队列的长度,最小ID等信息;

*|ID 消息ID,可以自定义或Redis 自动生成; 自定义生成时, 需要保证单调递增; 使用符号"*"表示由Redis生成; 按时间戳-序号规则生成, 其中时间戳是毫秒级的Redis服务器时间;

field value 消息内容, 1个或多个KV键值对;

代码语言:javascript复制
127.0.0.1:6379> xadd mystream 1 k1 v1 k2 v2
"1-0"
127.0.0.1:6379> xadd mystream * k3 v3
"1622699291143-0"
# 批量添加
127.0.0.1:6379> multi 
OK
127.0.0.1:6379(TX)> xadd mystream * k4 v4
QUEUED
127.0.0.1:6379(TX)> xadd mystream * k5 v5
QUEUED
127.0.0.1:6379(TX)> exec
1) "1622700074168-0"
2) "1622700074168-1"

2

XRANGE 与 XREVRANGE

xrange按ID范围读取记录, 从数据 ID的小老到大的顺序读取; xrevrange则是反向读取, 按 ID从大到小的顺序读取.

代码语言:javascript复制
xrange key start end [COUNT count]
127.0.0.1:6379> xrange mystream -   count 4
1) 1) "1-0"
   2) 1) "k1"
      2) "v1"
      3) "k2"
      4) "v2"
2) 1) "1622699291143-0"
   2) 1) "k3"
      2) "v3"
3) 1) "1622700074168-0"
   2) 1) "k4"
      2) "v4"
4) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"
127.0.0.1:6379> 
127.0.0.1:6379> XREVRANGE mystream   - count 4
1) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"
2) 1) "1622700074168-0"
   2) 1) "k4"
      2) "v4"
3) 1) "1622699291143-0"
   2) 1) "k3"
      2) "v3"
4) 1) "1-0"
   2) 1) "k1"
      2) "v1"
      3) "k2"
      4) "v2"
127.0.0.1:6379> 

3

XREAD

可以按阻塞和非阻塞两种方式读取消息信息.

代码语言:javascript复制
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

[COUNT count] 用于限定获取的消息数量

[BLOCK milliseconds] 阻塞模式下的超时时间, 默认为非阻塞模式;

ID 指定信息ID开始读取; 0表示从第一条消息开始; 阻塞模式下, 表示最新的消息ID; 在非阻塞模式下无意义.

代码语言:javascript复制
127.0.0.1:6379> xread count 1 block 100 streams mystream 0
1) 1) "mystream"
   2) 1) 1) "1-0"
         2) 1) "k1"
            2) "v1"
            3) "k2"
            4) "v2"
127.0.0.1:6379> xread count 1 block 100 streams mystream $
(nil)

4

XLEN

队列数据个数

代码语言:javascript复制
127.0.0.1:6379> xlen mystream
(integer) 4

5

XTRIM

限制队列长度;

~ 表示可以不十分精确的限制队列N条, 可以多出一些, 但是不能少. 可以减轻 Redis服务准确计算队列长度的压力;

代码语言:javascript复制
127.0.0.1:6379> XTRIM mystream MAXLEN ~ 1000

6

XDEL

删除指定ID消息

代码语言:javascript复制
127.0.0.1:6379> xadd mystream * k6 v6
"1622702539939-0"
127.0.0.1:6379> xrange mystream 1622700074168-1  
1) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"
2) 1) "1622702539939-0"
   2) 1) "k6"
      2) "v6"
127.0.0.1:6379> xdel mystream 1622702539939-0
(integer) 1
127.0.0.1:6379> xrange mystream 1622700074168-1  
1) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"
127.0.0.1:6379>

7

XGROUP

队列消费分组的创建和删除, 以及消费者的创建和销毁.

Stream可以挂多个消费组, 各消费组之间互相独立,不受影响, 每个消费组都会有个游标last_delivered_id标识当前消费组的消费位置.

同一消费组中, 可以有多个消费者, 协同消费消息, 每读取一条消息, last_delivered_id都会下移一位, 同时会记录在PEL(pending_ids)中, 直到消息被ACK确认后, 才会从PEL中移除.

代码语言:javascript复制
XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupnanme]

# 创建消费分组

代码语言:javascript复制
127.0.0.1:6379> XGROUP create mystream mygroup 0
OK
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
   2) "mygroup"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"

# 设置开始处理消息ID

代码语言:javascript复制
127.0.0.1:6379> XGROUP setid mystream mygroup 1622699291143-0
OK
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
   2) "mygroup"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1622699291143-0"
127.0.0.1:6379>

删除消费分组

代码语言:javascript复制
127.0.0.1:6379> xgroup destroy mystream mygroup 
(integer) 1
127.0.0.1:6379> xinfo groups mystream
(empty array)
127.0.0.1:6379>

创建指定分组中消费者

代码语言:javascript复制
127.0.0.1:6379> XGROUP CREATECONSUMER mystream mygroup myconsumer
(integer) 1
127.0.0.1:6379> xinfo consumers mystream mygroup
1) 1) "name"
   2) "myconsumer"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 41127
2) 1) "name"
   2) "myconsumer1"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 1974

删除指定分组中消费者

代码语言:javascript复制
127.0.0.1:6379> XGROUP delconsumer mystream mygroup myconsumer1
(integer) 0
127.0.0.1:6379> xinfo consumers mystream mygroup
1) 1) "name"
   2) "myconsumer"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 110278
127.0.0.1:6379>

8

XREADGROUP

指定消费组内的消费者读取消息

代码语言:javascript复制
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

[COUNT count] 每次获取消息的数量;

[BLOCK milliseconds] 阻塞模式和超时时间;

[NOACK] 不需要确认消息, 适用于不怎么重要的可以丢失的消息;

key [key ...] 指定Stream key;

ID [ID ...] 指定的消息 ID; > 指定读取所有未消费的消息, 其他值指定被挂起的消息;

查看当前Stream 消费组信息, 确认最后消费位置

代码语言:javascript复制
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
   2) "mygroup"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1622699291143-0"

查看Stream 数据

代码语言:javascript复制
127.0.0.1:6379> xrange mystream -  
1) 1) "1-0"
   2) 1) "k1"
      2) "v1"
      3) "k2"
      4) "v2"
2) 1) "1622699291143-0"
   2) 1) "k3"
      2) "v3"
3) 1) "1622700074168-0"
   2) 1) "k4"
      2) "v4"
4) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"

消费Stream队列中下一条消息

代码语言:javascript复制
127.0.0.1:6379> XREADGROUP group mygroup myconsumer count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1622700074168-0"
         2) 1) "k4"
            2) "v4"
127.0.0.1:6379>

9

XPENDING

查看未进行确认的数据

代码语言:javascript复制
127.0.0.1:6379> xpending mystream mygroup -    10
1) 1) "1622700074168-0"
   2) "myconsumer"
   3) (integer) 25605849
   4) (integer) 1
127.0.0.1:6379>

10

XACK

确认当前消费分组处理完毕该消息

代码语言:javascript复制
127.0.0.1:6379> xack mystream mygroup 1622700074168-0
(integer) 1
127.0.0.1:6379> xpending mystream mygroup -    10
(empty array)
127.0.0.1:6379>

11

XCLAIM

可能因为服务异常等原因, 消息长时间未能得到消费或者确认, 可以通过 xclaim 命令读取指定 ID 或者空闲时间的数据, 重新消费数据, 并再次确认(xack)消息;

代码语言:javascript复制
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]

ID [ID ...] 消息的 ID;

[IDLE ms] 设置消息的空闲时间, 如果不提供, 默认为 0;

[TIME ms-unix-time] 和IDLE相同, unix 时间戳;

RETRYCOUNT 设置重试次数, 通常 xclaim 不会改变这个值, 它通常用于 xpending 命令, 用来发现一些长时间未被处理的消息;

FORCE 在 PEL 中创建待处理消息, 即使指定的 ID 尚未分配给客户端的PEL;

JUSTID 只返回认领的消息 ID 数组, 不返回实际消息;

代码语言:javascript复制
127.0.0.1:6379> XREADGROUP group mygroup myconsumer count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1622700074168-1"
         2) 1) "k5"
            2) "v5"
127.0.0.1:6379> xpending mystream mygroup -    10
1) 1) "1622700074168-1"
   2) "myconsumer"
   3) (integer) 3256
   4) (integer) 1
127.0.0.1:6379> XCLAIM mystream  mygroup myconsumer 10 1622700074168-1 
1) 1) "1622700074168-1"
   2) 1) "k5"
      2) "v5"

12

XINFO

Stream也提供了查看队列元数据的命令

代码语言:javascript复制
127.0.0.1:6379> xinfo help
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3)     Show consumers of <groupname>.
4) GROUPS <key>
5)     Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7)     Show information about the stream.
8) HELP
9)     Prints this help.

查看队列基本信息, 包括消费分组, 以及对应的最后消费数据 ID 信息

代码语言:javascript复制
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"
   2) "mygroup1"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"

查看指定消费分组的消费者

代码语言:javascript复制
127.0.0.1:6379> xinfo consumers mystream mygroup1
1) 1) "name"
   2) "myconsumer1"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 19009

查看队列元信息

代码语言:javascript复制
127.0.0.1:6379> xinfo stream mystream
 1) "length"
 2) (integer) 1
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1622603679677-0"
 9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1622603679677-0"
    2) 1) "k1"
       2) "v1"
13) "last-entry"
14) 1) "1622603679677-0"
    2) 1) "k1"
       2) "v1"

总结

以上, 就是 RedisV5.0版本的Stream的基本功能了.

0 人点赞