个人简介:Java领域新星创作者;阿里云技术博主、星级博主、专家博主;正在Java学习的路上摸爬滚打,记录学习的过程~ 个人主页:.29.的博客 学习社区:进去逛一逛~
一、Redis流 (Stream)
Redis 5.0 之前,实现消息队列的两种方案
:
方案一:List实现
方案二:发布订阅(Pub/Sub)
Redis 5.0 后
:
- Redis Steam 是redis 5.0 新增的一种数据结构
- Redis Stram可以用来实现消息队列,它支持消息的持久化、支持自动生成全局唯一ID、支持ack确认消息的模式、支持消费组模式等,让消息队列更加稳定和可靠
Stream 结构
:
- Stream本质是一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容
Message Content
: 消息内容Consumer group
:消费组,通过XGROUP CREATE命令创建,一个消费组中可以有多个消费者Last_delivered_id
:游标,每个消费组会有一个游标Last_delivered_id,任意一个消费者读取了消息都会使得这个游标往前移动Consumer
:消费组中的消费者Pending_ids
:每个消费者都会有一个状态变量,用于记录被当前消费者已读取但未被ack确认的消息ID,如果客户端没有ack确认,这个变量里面的消息ID会愈来愈多,一旦某个消息被ack,它就开始减少。这个Pending_ids变量在Redis官方被称为PEL(Pending Entries List),记录了当前已经被客户端读取的但还未ack (Acknowledge character:确认字符)的消息,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失而导致没处理。
二、Redis Stream 基本操作命令
1.队列相关命令
① xadd 向Stream队列添加消息
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
# 消息队列:mystream ,不存在会先创建再添加消息
# * : * 号表示服务器自动生成 MessageID
# id —— 29 , name —— little29 ,两对键值对,也是添加到队列中的消息
xadd mystream * id 29 name little29
② xrange 获取指定队列的消息列表
xrange key start end [COUNT count]
start
: 代表开始值
end
: 代表结束值
count
:表示最多获取多少个值
# -号:代表最小值
# 号:代表最大值
xrange mystream -
③ xrevrange 获取指定队列的消息列表
- 与xrange的区别在于,获取消息队列元素的方向是相反的,end在前,start在后
xrevrange key end start [COUNT count]
start
: 代表开始值
end
: 代表结束值
count
:表示最多获取多少个值
# -号:代表最小值
# 号:代表最大值
xrange mystream -
④ xdel 删除消息列表
- 根据指定的MessageID,删除一个或多个消息列表
xdel key id [id ...]
xdel mystream 1681006258096-0
⑤ xlen 获取Stream队列消息的长度
xlen key
xlen mystream
⑥ xtrim 对Stream的长度进行截取
xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]
MAXLEN
: 允许的最大长度,超过就会对流进行截取
MINID
: 允许的最小ID,从某个ID值开始,比这个ID值小的将会被抛弃
# 1681007772647-0作为最小id,id值小于它的会被抛弃
xtrim mystream minid 1681007772647-0
⑦ xread 获取消息,只会返回大于指定id的消息
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
COUNT
: 最多读取多少条消息
BLOCK
: 表示是否以阻塞的方式读取消息,默认不阻塞,如果milliseconds设置为0,表示永久阻塞
# $代表特殊ID,表示以当前Stream已经存储的最大的ID作为最后一个ID,当前Stream中不存在大于当前最大ID的消息,因此此时返回nil
xread count 2 streams mystream $
# 0-0代表从最小的ID开始获取Stream中的消息,当不指定count,将会返回Stream中的所有消息,注意也可以使用0(00/000也都是可以的……)
xread count 2 streams mystream 0-0
2.消费组相关命令
- Stream中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里得消费者不能消费同一条消息。
- 但是,不同消费组中的消费者可以消费同一条消息。
- 消费组的
目的
: 让组内多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
① xgroup create 创建消费组
xgroup create key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]
- 创建消费者组的时候必须指定 ID, ID 为 0 表示从头开始消费,为 $ 表示只消费新的消息
- $表示从Stream尾部开始消费
- 0表示从Stream头部开始消费
# 从头开始消费
xgroup create mystream group1 0
# 只消费新的消息
xgroup create mystream group2 $
② xreadgroup GROUP 指定消费组中消费者来消费消息
xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
- 命令末尾加上“>”,表示从第一条尚未被消费的消息开始读取
# 消费组group1中的消费者consumer1从mystream消息队列中读取所有消息
# “>”:表示从第一条尚未被消费的消息开始读取
xreadgroup GROUP group1 consumer1 STREAMS mystream >
③ xpending 查询已读取但尚未确认的消息
xpending key group [[IDLE min-idle-time] start end count [consumer]]
# 查询指定消费组group1对mystream中已读取但未确认的消息
xpending mystream group1
④ xack 向消息队列确认消息已经处理完成
xack key group id [id ...]
# 通过xack,向消息队列确认指定消息ID:1681007772647-0的消息已经处理完成
xack mystream group1 1681007772647-0
⑤ xinfo stream 查询流的详细信息
xinfo stream key [FULL [COUNT count]]
# 查询流:mystream的详细信息
xinfo stream mystream