【redis】 属于redis的 “消息队列”:redis stream(浅析)

2021-12-27 13:16:54 浏览数 (1)

文章目录

    • 关于 redis stream
    • redis stream 使用示例
      • 添加消息
        • 使用示例
        • 时间复杂度
      • 读取消息
        • XREAD
          • 时间复杂度
        • XRANGE
      • 删除消息
        • XDEL
        • XTRIM

关于 redis stream

这以前只知道redis有类似于消息队列的发布/订阅,还真不知道它居然悄咪咪的有“消息队列”呀哈。

redis stream 实现了大部分消息队列的功能,如:

代码语言:javascript复制
消息ID的序列化生成
消息遍历
消息的阻塞和非阻塞读取
消息的分组消费
ACK确认机制

发布/订阅 模式不能算是真正意义上的消息队列,它有一定的实时性,而且没有做持久化。 不过redis stream 和卡夫卡之类的消息队列也没法比,毕竟它是在内存里的,小。

redis stream 使用示例

官网命令文档参考

添加消息

XADD命令可以发送消息到指定 Stream 消息流中(若不存在则创建)。

使用示例:

代码语言:javascript复制
XADD mystream 1526919030474-55 键 值

XADD Stream名 消息ID message 消息内容

至于官方文档或者其他博客如果提到什么 *,我也不说啥。

如果指定的ID参数是*字符,XADD命令将自动生成唯一的ID。自动生成ID时,第一部分是生成ID的Redis实例的Unix时间(以毫秒为单位)。第二部分只是序列号,用于区分在同一毫秒内生成的ID。

ID保证总是递增的,因此条目在流中是完全有序的。为了保证此属性,如果流中的当前top ID的时间大于实例的当前本地时间,则将使用top entry time,并且ID的序列部分将增加。例如,当本地时钟向后跳时,或者在故障切换后,新主机具有不同的绝对时间时,可能会发生这种情况。

当用户为XADD指定显式ID时,最小有效ID为0-1,并且用户必须指定一个大于流中当前任何其他ID的ID,否则命令将失败并返回错误。

这个事情告诉我们,如果没有什么特殊需求,就用系统自动生成的ID吧。

版本变迁:

代码语言:javascript复制
>=6.2:增加了NOMKSTREAM选项、MINID微调策略和LIMIT选项。
>=7.0:添加了对<ms>-*显式ID表单的支持。
使用示例
代码语言:javascript复制
redis>  XADD mystream * name Sara surname OConnor

"1640412909358-0"

redis>  XADD mystream * field1 value1 field2 value2 field3 value3

"1640412909358-1"

redis>  XLEN mystream

(integer) 2

redis>  XRANGE mystream -  

1) 1) "1640412909358-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1640412909358-1"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"

redis>
时间复杂度

O(1) when adding a new entry, O(N) when trimming where N being the number of entries evicted.

读取消息

XREAD

XREAD可用于从消息流中读取数据。

格式应该看得出来吧。

最后的参数是消息ID,redis会返回大于该ID的消息。 “0-0”是一个特殊的ID,代表最小的消息ID,使用它可以要求redis从头读取消息。

XREAD 也可以阻塞客户端,等待消息流中接收新的消息。为了进行阻塞,将使用block选项以及超时前要阻塞的毫秒数。

通常,Redis阻塞命令以秒为单位超时,但是此命令需要毫秒超时,即使服务器的超时分辨率通常接近0.1秒。

通常这个命令这样使用乎好一些:

代码语言:javascript复制
XREAD BLOCK 1000 STREAMS mystream $

$ 也是一个特殊ID,表示当前最大的消息ID。使用它可以要求redis读取最新的消息。

时间复杂度

Time complexity: For each stream mentioned: O(N) with N being the number of elements being returned, it means that XREAD-ing with a fixed COUNT is O(1). Note that when the BLOCK option is used, XADD will pay O(M) time in order to serve the M clients blocked on the stream getting new data.

XRANGE

范围读取,字面意思。

这里也有特殊字符可以看一下: 、- The - and special IDs mean respectively the minimum ID possible and the maximum ID possible inside a stream。

可以使用count来限制数量:

删除消息

XDEL

这个也没啥好说的,直接来个示例即可。

代码语言:javascript复制
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream -  
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"
XTRIM

使用 XTRIM 对流进行修剪,限制长度。

代码语言:javascript复制
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream -  
1) 1) "1601372434568-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"
127.0.0.1:6379>
 
redis>

现在也不想写太多,毕竟还没用过。官方文档放在开头了,可自行阅读。 等我下个月用过之后就有的写了。

0 人点赞