文章目录
- 关于 redis stream
- redis stream 使用示例
- 添加消息
- 使用示例
- 时间复杂度
- 读取消息
- XREAD
- 时间复杂度
- XRANGE
- 删除消息
- XDEL
- XTRIM
关于 redis stream
这以前只知道redis有类似于消息队列的发布/订阅,还真不知道它居然悄咪咪的有“消息队列”呀哈。
redis stream 实现了大部分消息队列的功能,如:
代码语言:javascript复制消息ID的序列化生成
消息遍历
消息的阻塞和非阻塞读取
消息的分组消费
ACK确认机制
发布/订阅 模式不能算是真正意义上的消息队列,它有一定的实时性,而且没有做持久化。 不过redis stream 和卡夫卡之类的消息队列也没法比,毕竟它是在内存里的,小。
redis stream 使用示例
官网命令文档参考
添加消息
XADD命令可以发送消息到指定 Stream 消息流中(若不存在则创建)。
使用示例:
代码语言:javascript复制XADD mystream 1526919030474-55 键 值
XADD Stream名 消息ID message 消息内容
至于官方文档或者其他博客如果提到什么 *,我也不说啥。
如果指定的ID参数是*字符,XADD命令将自动生成唯一的ID。自动生成ID时,第一部分是生成ID的Redis实例的Unix时间(以毫秒为单位)。第二部分只是序列号,用于区分在同一毫秒内生成的ID。
ID保证总是递增的,因此条目在流中是完全有序的。为了保证此属性,如果流中的当前top ID的时间大于实例的当前本地时间,则将使用top entry time,并且ID的序列部分将增加。例如,当本地时钟向后跳时,或者在故障切换后,新主机具有不同的绝对时间时,可能会发生这种情况。
当用户为XADD指定显式ID时,最小有效ID为0-1,并且用户必须指定一个大于流中当前任何其他ID的ID,否则命令将失败并返回错误。
这个事情告诉我们,如果没有什么特殊需求,就用系统自动生成的ID吧。
版本变迁:
代码语言:javascript复制>=6.2:增加了NOMKSTREAM选项、MINID微调策略和LIMIT选项。
>=7.0:添加了对<ms>-*显式ID表单的支持。
使用示例
代码语言:javascript复制redis> XADD mystream * name Sara surname OConnor
"1640412909358-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1640412909358-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream -
1) 1) "1640412909358-0"
2) 1) "name"
2) "Sara"
3) "surname"
4) "OConnor"
2) 1) "1640412909358-1"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
5) "field3"
6) "value3"
redis>
时间复杂度
O(1) when adding a new entry, O(N) when trimming where N being the number of entries evicted.
读取消息
XREAD
XREAD可用于从消息流中读取数据。
格式应该看得出来吧。
最后的参数是消息ID,redis会返回大于该ID的消息。 “0-0”是一个特殊的ID,代表最小的消息ID,使用它可以要求redis从头读取消息。
XREAD 也可以阻塞客户端,等待消息流中接收新的消息。为了进行阻塞,将使用block选项以及超时前要阻塞的毫秒数。
通常,Redis阻塞命令以秒为单位超时,但是此命令需要毫秒超时,即使服务器的超时分辨率通常接近0.1秒。
通常这个命令这样使用乎好一些:
代码语言:javascript复制XREAD BLOCK 1000 STREAMS mystream $
$ 也是一个特殊ID,表示当前最大的消息ID。使用它可以要求redis读取最新的消息。
时间复杂度
Time complexity: For each stream mentioned: O(N) with N being the number of elements being returned, it means that XREAD-ing with a fixed COUNT is O(1). Note that when the BLOCK option is used, XADD will pay O(M) time in order to serve the M clients blocked on the stream getting new data.
XRANGE
范围读取,字面意思。
这里也有特殊字符可以看一下: 、- The - and special IDs mean respectively the minimum ID possible and the maximum ID possible inside a stream。
可以使用count来限制数量:
删除消息
XDEL
这个也没啥好说的,直接来个示例即可。
代码语言:javascript复制> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream -
1) 1) 1538561698944-0
2) 1) "a"
2) "1"
2) 1) 1538561701744-0
2) 1) "c"
2) "3"
XTRIM
使用 XTRIM 对流进行修剪,限制长度。
代码语言:javascript复制127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream -
1) 1) "1601372434568-0"
2) 1) "field1"
2) "A"
3) "field2"
4) "B"
5) "field3"
6) "C"
7) "field4"
8) "D"
127.0.0.1:6379>
redis>
现在也不想写太多,毕竟还没用过。官方文档放在开头了,可自行阅读。 等我下个月用过之后就有的写了。