⑨【Stream】Redis流是什么?怎么用?: Stream [使用手册]

2023-11-26 09:18:56 浏览数 (2)

个人简介:Java领域新星创作者;阿里云技术博主、星级博主、专家博主;正在Java学习的路上摸爬滚打,记录学习的过程~ 个人主页:.29.的博客 学习社区:进去逛一逛~

一、Redis流 (Stream)

Redis 5.0 之前,实现消息队列的两种方案

方案一:List实现

方案二:发布订阅(Pub/Sub)


Redis 5.0 后

  • Redis Steam 是redis 5.0 新增的一种数据结构
  • Redis Stram可以用来实现消息队列,它支持消息的持久化、支持自动生成全局唯一ID、支持ack确认消息的模式、支持消费组模式等,让消息队列更加稳定和可靠

Stream 结构

  • Stream本质是一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容
  • Message Content : 消息内容
  • Consumer group消费组,通过XGROUP CREATE命令创建,一个消费组中可以有多个消费者
  • Last_delivered_id游标,每个消费组会有一个游标Last_delivered_id,任意一个消费者读取了消息都会使得这个游标往前移动
  • Consumer :消费组中的消费者
  • Pending_ids :每个消费者都会有一个状态变量,用于记录被当前消费者已读取但未被ack确认的消息ID,如果客户端没有ack确认,这个变量里面的消息ID会愈来愈多,一旦某个消息被ack,它就开始减少。这个Pending_ids变量在Redis官方被称为PEL(Pending Entries List),记录了当前已经被客户端读取的但还未ack (Acknowledge character:确认字符)的消息,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失而导致没处理。

二、Redis Stream 基本操作命令

1.队列相关命令

① xadd 向Stream队列添加消息

xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]

代码语言:javascript复制
# 消息队列:mystream ,不存在会先创建再添加消息
# * : * 号表示服务器自动生成 MessageID
# id —— 29 , name —— little29 ,两对键值对,也是添加到队列中的消息
xadd mystream * id 29 name little29


② xrange 获取指定队列的消息列表

xrange key start end [COUNT count]

start : 代表开始值

end : 代表结束值

count :表示最多获取多少个值

代码语言:javascript复制
# -号:代表最小值
#  号:代表最大值
xrange mystream -  


③ xrevrange 获取指定队列的消息列表
  • 与xrange的区别在于,获取消息队列元素的方向是相反的,end在前,start在后

xrevrange key end start [COUNT count]

start : 代表开始值

end : 代表结束值

count :表示最多获取多少个值

代码语言:javascript复制
# -号:代表最小值
#  号:代表最大值
xrange mystream   -


④ xdel 删除消息列表
  • 根据指定的MessageID,删除一个或多个消息列表

xdel key id [id ...]

代码语言:javascript复制
xdel mystream 1681006258096-0


⑤ xlen 获取Stream队列消息的长度

xlen key

代码语言:javascript复制
xlen mystream


⑥ xtrim 对Stream的长度进行截取

xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]

MAXLEN : 允许的最大长度,超过就会对流进行截取

MINID : 允许的最小ID,从某个ID值开始,比这个ID值小的将会被抛弃

代码语言:javascript复制
# 1681007772647-0作为最小id,id值小于它的会被抛弃
xtrim mystream minid 1681007772647-0


⑦ xread 获取消息,只会返回大于指定id的消息

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

COUNT : 最多读取多少条消息

BLOCK : 表示是否以阻塞的方式读取消息,默认不阻塞,如果milliseconds设置为0,表示永久阻塞

代码语言:javascript复制
# $代表特殊ID,表示以当前Stream已经存储的最大的ID作为最后一个ID,当前Stream中不存在大于当前最大ID的消息,因此此时返回nil
xread count 2 streams mystream $
# 0-0代表从最小的ID开始获取Stream中的消息,当不指定count,将会返回Stream中的所有消息,注意也可以使用0(00/000也都是可以的……)
xread count 2 streams mystream 0-0


2.消费组相关命令

  • Stream中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里得消费者不能消费同一条消息。
  • 但是,不同消费组中的消费者可以消费同一条消息。
  • 消费组的目的 让组内多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
① xgroup create 创建消费组

xgroup create key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]

  • 创建消费者组的时候必须指定 ID, ID 为 0 表示从头开始消费,为 $ 表示只消费新的消息
  • $表示从Stream尾部开始消费
  • 0表示从Stream头部开始消费
代码语言:javascript复制
# 从头开始消费
xgroup create mystream group1 0
# 只消费新的消息
xgroup create mystream group2 $


② xreadgroup GROUP 指定消费组中消费者来消费消息

xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]

  • 命令末尾加上“>”,表示从第一条尚未被消费的消息开始读取
代码语言:javascript复制
# 消费组group1中的消费者consumer1从mystream消息队列中读取所有消息
# “>”:表示从第一条尚未被消费的消息开始读取
xreadgroup GROUP group1 consumer1 STREAMS mystream >


③ xpending 查询已读取但尚未确认的消息

xpending key group [[IDLE min-idle-time] start end count [consumer]]

代码语言:javascript复制
# 查询指定消费组group1对mystream中已读取但未确认的消息
xpending mystream group1


④ xack 向消息队列确认消息已经处理完成

xack key group id [id ...]

代码语言:javascript复制
# 通过xack,向消息队列确认指定消息ID:1681007772647-0的消息已经处理完成
xack mystream group1 1681007772647-0


⑤ xinfo stream 查询流的详细信息

xinfo stream key [FULL [COUNT count]]

代码语言:javascript复制
# 查询流:mystream的详细信息
xinfo stream mystream

重点问题

0 人点赞