Redis进阶学习04---秒杀优化和消息队列
- 秒杀优化
- 秒杀优化的具体实现
- 基于jdk阻塞队列完成的秒杀优化总结
- 秒杀优化的具体实现
- Redis消息队列实现秒杀
- 基于Redis的List实现消息队列
- 基于Redis的PubSub实现消息队列
- 基于Stream实现消息队列
- 基于Stream的消息队列之消费者组
- Redis-Stream详解
- 追加新消息,XADD,生产消息
- 从消息队列中获取消息,XREAD,消费消息
- 消息ID说明
- 消费者组模式,consumer group
- Pending 等待列表
- 消息转移
- 坏消息问题,Dead Letter,死信问题
- 信息监控,XINFO
- 命令小结
- JAVA伪代码实现Stream消费队列
- Stream消息队列小结
- 综上比较
- 使用Stream作为消息队列优化之前的秒杀案例
- 总结
秒杀优化
如果一个饭店只有一个服务员,并且这个服务员不仅需要负责客人的点餐服务,还需要负责炒菜服务,显然这样的话,只能是先处理完第一个客人所有的点餐,烧菜任务后,才能去处理下一个客人的点餐,烧菜任务,这样显然把任务给串行化了,效率大大降低。
而现在我们就面临这样的问题:
目前整个秒杀的过程都是串行化执行的,并且这个流程里面涉及多次数据库查询操作,数据库查询是最耗费时间的,因此优化的思路就是把最耗费时间的数据库写操作转换为异步执行,然后把数据库查询操作通过redis查询替换掉,这样整体就分为了两部分,一部分是主线程去redis判断校验,然后如果判断和校验都通过了,就将消息放入一个队列中,异步线程从该队列中取出消息,然后去执行数据库写操作。
此时redis就相当于服务员,负责库存数量判断和重复购买校验,然后将合法的订单交易,放入队列中,异步处理线程,从队列读取消息,进行数据库写处理,即扣减库存,创建订单的耗时逻辑,全部异步完成。
显然,关于redis那部分判断逻辑,应该都由lua脚本来完成,而非java代码
秒杀优化的具体实现
1.新增优惠卷的同时,将优惠卷信息保存到Redis中
代码语言:javascript复制 @Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
//保存优惠卷信息到Redis
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY voucher.getId(),voucher.getStock().toString());
}
测试:
2.lua脚本编写
代码语言:javascript复制-- 1.参数列表
-- 1.1 优惠卷id
local voucherId= ARGV[1]
-- 1.2 用户id
local userId= ARGV[2]
--2.数据key
--2.1库存key
local storeKey="seckill:stock:" .. voucherId
--2.2订单key
local orderKey="seckill:order:" .. voucherId
--3.脚本业务
--3.1判断库存是否充足 get storeKey
if(tonumber(redis.call('get',storeKey))<=0) then
--3.2库存不足,返回1
return 1
end
--3.2判断用户是否下单--set集合的判断方法,判断某个集合中是否存在某个value
if(redis.call('sismember',orderKey,userId)==1) then
--3.3存在,说明是重复下单,返回2
return 2
end
--3.4扣库存incrby storeKey -1
redis.call('incrby',storeKey,-1)
--3.5下单(保存用户)sadd orderkey userId
redis.call('sadd',orderKey,userId)
return 0
3.修改抢购逻辑
代码语言:javascript复制@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService iSeckillVoucherService;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisWorker redisWorker;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT=new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
Long uid = UserHolder.getUser().getId();
//1.执行lua脚本
Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString());
//2.判断结果是否为0
int r=res.intValue();
if(r!=0){
return Result.fail(r==1?"库存不足":"不能重复下单");
}
//3.为0,有购买资格,把下单信息保存到阻塞队列
long order = redisWorker.nextId("order");
//TODO:保存到阻塞队列
//4.返回订单id
return Result.ok(order);
}
}
当我们测试一下后:
此时数据库并无变化,因为我们还没把消息放入阻塞队列,从而通知异步线程去处理
4.异步线程处理阻塞队列中的消息
代码语言:javascript复制@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService iSeckillVoucherService;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisWorker redisWorker;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT=new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
/**
* 阻塞队列
*/
private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue(1024*1024);
/**
* 异步线程
*/
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();
@PostConstruct
public void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
public class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while(true){
//1.获取队列中的订单信息
try {
//1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
//2.创建订单
createVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
log.error("订单创建异常: ",e);
}
}
}
/**
* 保守起见,还会再次进行判断
*/
private void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
//创建锁对象
RLock lock = redissonClient.getLock("lock:order:" userId);
//尝试获取分布式锁
// 第一个参数为获取锁的最大等待时间(期间会重试)--默认-1,,失败直接返回
//锁自动释放时间--默认30秒
//时间单位
boolean tryLock = lock.tryLock();
if(!tryLock){
log.error("用户[" userId "]对" voucherId "优惠卷重复抢购");
}
//我们只需要确保下面这两行代码的集群并发问题被解决
try{
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
log.error("用户[" userId "]对" voucherId "优惠卷重复抢购");
}
}finally {
lock.unlock();
}
//6.扣减库存
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
if(!success){
log.error("库存扣减失败");
}
save(voucherOrder);
}
}
@Override
public Result seckillVoucher(Long voucherId) {
Long uid = UserHolder.getUser().getId();
//1.执行lua脚本
Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString());
//2.判断结果是否为0
int r=res.intValue();
if(r!=0){
return Result.fail(r==1?"库存不足":"不能重复下单");
}
//3.为0,有购买资格,把下单信息保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(uid);
voucherOrder.setVoucherId(voucherId);
orderTasks.add(voucherOrder);
//4.返回订单id
return Result.ok(orderId);
}
}
基于jdk阻塞队列完成的秒杀优化总结
阻塞队列里面数据过多可能会导致jvm内存溢出,还有就是即便设置了阻塞队列最大元素个数上限也有弊端,就是如果元素过多,处理速度跟不上,会导致很多额外任务放入阻塞队列失败
还有就是数据都是存放在内存中的,一旦java程序出现异常,那么内存中的任务将会全部丢失,并且一旦出现异常,也会导致某个任务执行失败
Redis消息队列实现秒杀
基于Redis的List实现消息队列
基于Redis的PubSub实现消息队列
基于Stream实现消息队列
默认是非阻塞的,并且如果阻塞时长传入0,表示无限等待
基于Stream的消息队列之消费者组
Redis-Stream详解
相信各位光看上面的介绍,应该对Stream还是一知半解,下面我来详细介绍一下它的用法:
追加新消息,XADD,生产消息
XADD,命令用于在某个stream(流数据)中追加消息,演示如下:
代码语言:javascript复制127.0.0.1:6379> XADD memberMessage * user kang msg Hello
"1651325244694-0"
127.0.0.1:6379> XADD memberMessage * user zhong msg nihao
"1651325256282-0"
其中语法格式为:
代码语言:javascript复制XADD key ID field string [field string …]
需要提供key,消息ID方案,消息内容,其中消息内容为key-value型数据。
ID,最常使用*,表示由Redis生成消息ID,这也是强烈建议的方案。
field string [field string], 就是当前消息内容,由1个或多个key-value构成。
上面的例子中,在memberMemsages这个key中追加了user kang msg Hello这个消息。Redis使用毫秒时间戳和序号生成了消息ID。此
从消息队列中获取消息,XREAD,消费消息
XREAD,从Stream中读取消息,演示如下:
代码语言:javascript复制127.0.0.1:6379> XREAD streams memberMessage 0
1) 1) "memberMessage"
2) 1) 1) "1651325244694-0"
2) 1) "user"
2) "kang"
3) "msg"
4) "Hello"
2) 1) "1651325256282-0"
2) 1) "user"
2) "zhong"
3) "msg"
4) "nihao"
消息被读取后,并不会从stream队列中消失,这点需要注意
上面的命令是从消息队列memberMessage中读取所有消息。XREAD支持很多参数,语法格式为:
代码语言:javascript复制XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
其中:
- [COUNT count],用于限定获取的消息数量
- [BLOCK milliseconds],用于设置XREAD为阻塞模式,默认为非阻塞模式
- ID,用于设置由哪个消息ID开始读取。
使用0表示从第一条消息开始
。(本例中就是使用0)此处需要注意,消息队列ID是单调递增的,所以通过设置起点,可以向后读取。在阻塞模式中,可以使用 $ 表 示 最 新 的 消 息 I D
。 ( 在 非 阻 塞 模 式 下,表示最新的消息ID)。(在非阻塞模式下无意义)。
XRED读消息时分为阻塞和非阻塞模式,使用BLOCK选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。
一个典型的阻塞模式用法为:
代码语言:javascript复制127.0.0.1:6379> XREAD block 1000 streams memberMessage $
(nil)
(1.07s)
我们使用Block模式,配合$作为ID,表示读取最新的消息,若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。
因此,典型的队列就是 XADD 配合 XREAD Block 完成。XADD负责生成消息,XREAD负责消费消息。
消息ID说明
XADD生成的1553439850328-0,就是Redis生成的消息ID,由两部分组成:时间戳-序号。时间戳是毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个64位整型。较真来说,序号可能会溢出,but真可能吗?
可以通过multi批处理,来验证序号的递增:
代码语言:javascript复制127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> XADD memberMessage * msg one
QUEUED
127.0.0.1:6379> XADD memberMessage * msg two
QUEUED
127.0.0.1:6379> XADD memberMessage * msg three
QUEUED
127.0.0.1:6379> XADD memberMessage * msg four
QUEUED
127.0.0.1:6379> XADD memberMessage * msg five
QUEUED
127.0.0.1:6379> EXEC
1) "1553441006884-0"
2) "1553441006884-1"
3) "1553441006884-2"
4) "1553441006884-3"
5) "1553441006884-4"
由于一个redis命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。
为了保证消息是有序的,因此Redis生成的ID是单调递增有序的。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latest_generated_id属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latest_generated_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。
强烈建议使用Redis的方案生成消息ID,因为这种时间戳 序号的单调递增的ID方案,几乎可以满足你全部的需求。但同时,记住ID是支持自定义的,别忘了!
消费者组模式,consumer group
当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有10条消息,三个消费者都可以消费到这10条消息。
但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有10条消息,三个消费者分别消费其中的某些消息,比如消费者A消费消息1、2、5、8,消费者B消费消息4、9、10,而消费者C消费消息3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。
即消费者组模式可以让多个消费者协同合作,来共同消息队列中的消息,提高队列中消息的消费效率
消费者组模式的支持主要由两个命令实现:
- XGROUP,用于管理消费者组,提供创建组,销毁组,更新组起始消息ID等操作
- XREADGROUP,分组消费消息操作
进行演示,演示时使用5个消息,思路是:创建一个Stream消息队列,生产者生成5条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:
代码语言:javascript复制# 生产者生成10条消息
127.0.0.1:6379> MULTI
127.0.0.1:6379> XADD mq * msg 1 # 生成一个消息:msg 1
127.0.0.1:6379> XADD mq * msg 2
127.0.0.1:6379> XADD mq * msg 3
127.0.0.1:6379> XADD mq * msg 4
127.0.0.1:6379> XADD mq * msg 5
127.0.0.1:6379> EXEC
1) "1553585533795-0"
2) "1553585533795-1"
3) "1553585533795-2"
4) "1553585533795-3"
5) "1553585533795-4"
# 创建消费组 mqGroup
127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 # 为消息队列 mq 创建消费组 mgGroup
OK
# 消费者A,消费第1条
127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq > #消费组内消费者A,从消息队列mq中读取一个消息
1) 1) "mq"
2) 1) 1) "1553585533795-0"
2) 1) "msg"
2) "1"
# 消费者A,消费第2条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >
1) 1) "mq"
2) 1) 1) "1553585533795-1"
2) 1) "msg"
2) "2"
# 消费者B,消费第3条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq >
1) 1) "mq"
2) 1) 1) "1553585533795-2"
2) 1) "msg"
2) "3"
# 消费者A,消费第4条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA count 1 STREAMS mq >
1) 1) "mq"
2) 1) 1) "1553585533795-3"
2) 1) "msg"
2) "4"
# 消费者C,消费第5条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerC COUNT 1 STREAMS mq >
1) 1) "mq"
2) 1) 1) "1553585533795-4"
2) 1) "msg"
2) "5"
上面的例子中,三个在同一组 mpGroup 消费者A、B、C在消费消息时(消费者在消费时指定即可,不用预先创建),有着互斥原则,消费方案为,A->1, A->2, B->3, A->4, C->5。语法说明为:
XGROUP CREATE mq mqGroup 0
,用于在消息队列mq上创建消费组 mpGroup,最后一个参数0,表示该组从第一条消息开始消费。(意义与XREAD的0一致)。除了支持CREATE外,还支持SETID设置起始ID,DESTROY销毁组,DELCONSUMER删除组内消费者等操作。
XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >
,用于组mqGroup内消费者consumerA在队列mq中消费,参数>表示未被组内消费的起始消息,参数count 1表示获取一条。语法与XREAD基本一致,不过是增加了组的概念。
可以进行组内消费的基本原理是,STREAM类型会为每个组记录一个最后处理(交付)的消息ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。
以上就是消费组的基础操作。除此之外,消费组消费时,还有一个必须要考虑的问题,就是若某个消费者,消费了某条消息,但是并没有处理成功时(例如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了。下面继续讨论解决方案。
Pending 等待列表
为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。演示如下:
代码语言:javascript复制127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
1) (integer) 5 # 5个已读取但未处理的消息
2) "1553585533795-0" # 起始ID
3) "1553585533795-4" # 结束ID
4) 1) 1) "consumerA" # 消费者A有3个
2) "3"
2) 1) "consumerB" # 消费者B有1个
2) "1"
3) 1) "consumerC" # 消费者C有1个
2) "1"
127.0.0.1:6379> XPENDING mq mqGroup - 10 # 使用 start end count 选项可以获取详细信息
1) 1) "1553585533795-0" # 消息ID
2) "consumerA" # 消费者
3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE
4) (integer) 5 # 消息被读取了5次,delivery counter
2) 1) "1553585533795-1"
2) "consumerA"
3) (integer) 1654355
4) (integer) 4
# 共5个,余下3个省略 ...
127.0.0.1:6379> XPENDING mq mqGroup - 10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
1) 1) "1553585533795-0"
2) "consumerA"
3) (integer) 1641083
4) (integer) 5
# 共3个,余下2个省略 ...
每个Pending的消息有4个属性:
- 消息ID
- 所属消费者
- IDLE,已读取时长
- delivery counter,消息被读取次数
上面的结果我们可以看到,我们之前读取的消息,都被记录在Pending列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令 XACK 完成告知消息处理完成,演示如下:
代码语言:javascript复制127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
(integer) 1
127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
1) (integer) 4 # 已读取但未处理的消息已经变为4个
2) "1553585533795-1"
3) "1553585533795-4"
4) 1) 1) "consumerA" # 消费者A,还有2个消息处理
2) "2"
2) 1) "consumerB"
2) "1"
3) 1) "consumerC"
2) "1"
127.0.0.1:6379>
有了这样一个Pending机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该Pending列表,就可以继续处理该消息了,保证消息的有序和不丢失。
此时还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者Pending的消息,转义给其他的消费者处理,就是消息转移。请继续。
消息转移
消息转移的操作时将某个消息转移到自己的Pending列表中。使用语法XCLAIM来实现,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:
代码语言:javascript复制# 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
127.0.0.1:6379> XPENDING mq mqGroup - 10
1) 1) "1553585533795-1"
2) "consumerA"
3) (integer) 15907787
4) (integer) 4
# 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
1) 1) "1553585533795-1"
2) 1) "msg"
2) "2"
# 消息1553585533795-1已经转移到消费者B的Pending中。
127.0.0.1:6379> XPENDING mq mqGroup - 10
1) 1) "1553585533795-1"
2) "consumerB"
3) (integer) 84404 # 注意IDLE,被重置了
4) (integer) 5 # 注意,读取次数也累加了1次
以上代码,完成了一次消息转移。转移除了要指定ID外,还需要指定IDLE,保证是长时间未处理的才被转移。被转移的消息的IDLE会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了IDLE,则可以避免后面的转移不会成功,因为IDLE不满足条件。例如下面的连续两条转移,第二条不会成功。
代码语言:javascript复制127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1
这就是消息转移。至此我们使用了一个Pending消息的ID,所属消费者和IDLE的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。这个属性主要用在判定是否为错误数据上。请继续看:
坏消息问题,Dead Letter,死信问题
正如上面所说,如果某个消息,不能被消费者处理,也就是不能被XACK,这是要长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter就会累加(上一节的例子可以看到),当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用XDEL语法,演示如下:
代码语言:javascript复制# 删除队列中的消息
127.0.0.1:6379> XDEL mq 1553585533795-1
(integer) 1
# 查看队列中再无此消息
127.0.0.1:6379> XRANGE mq -
1) 1) "1553585533795-0"
2) 1) "msg"
2) "1"
2) 1) "1553585533795-2"
2) 1) "msg"
2) "3"
注意本例中,并没有删除Pending中的消息因此你查看Pending,消息还会在。可以执行XACK标识其处理完毕!
信息监控,XINFO
Stream提供了XINFO来实现对服务器信息的监控,可以查询:
查看队列信息
代码语言:javascript复制127.0.0.1:6379> Xinfo stream mq
1) "length"
2) (integer) 7
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1
9) "last-generated-id"
10) "1553585533795-9"
11) "first-entry"
12) 1) "1553585533795-3"
2) 1) "msg"
2) "4"
13) "last-entry"
14) 1) "1553585533795-9"
2) 1) "msg"
2) "10"
消费组信息
代码语言:javascript复制127.0.0.1:6379> Xinfo groups mq
1) 1) "name"
2) "mqGroup"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 3
7) "last-delivered-id"
8) "1553585533795-4"
消费者组成员信息
代码语言:javascript复制127.0.0.1:6379> XINFO CONSUMERS mq mqGroup
1) 1) "name"
2) "consumerA"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 18949894
2) 1) "name"
2) "consumerB"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 3092719
3) 1) "name"
2) "consumerC"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 23683256
至此,消息队列的操作说明大体结束!
命令小结
- XACK 结束Pending
- XADD 生成消息
- XCLAIM 消息转移
- XDEL 删除消息
- XGROUP 消费组管理
- XINFO 得到消费组信息
- XLEN 消息队列长度
- XPENDING Pending列表
- XRANGE 获取消息队列中消息
- XREAD 消费消息
- XREADGROUP 分组消费消息
- XREVRANGE 逆序获取消息队列中消息
- XTRIM 消息队列容量
JAVA伪代码实现Stream消费队列
Stream消息队列小结
综上比较
使用Stream作为消息队列优化之前的秒杀案例
lua脚本改造
代码语言:javascript复制-- 1.参数列表
-- 1.1 优惠卷id
local voucherId= ARGV[1]
-- 1.2 用户id
local userId= ARGV[2]
--1.3订单id
local orderId=ARGV[3]
--2.数据key
--2.1库存key
local storeKey="seckill:stock:" .. voucherId
--2.2订单key
local orderKey="seckill:order:" .. voucherId
--3.脚本业务
--3.1判断库存是否充足 get storeKey
local storeNum=redis.call('get',storeKey)
--3.2 如果redis中没有该优惠卷库存记录,返回3
-- lua中只有false和nil是假值, 其他都是真值
if(storeNum) then
else
return 3
end
if(tonumber(storeNum)<=0) then
--3.2库存不足,返回1
return 1
end
--3.2判断用户是否下单--set集合的判断方法,判断某个集合中是否存在某个value
if(redis.call('sismember',orderKey,userId)==1) then
--3.3存在,说明是重复下单,返回2
return 2
end
--3.4扣库存incrby storeKey -1
redis.call('incrby',storeKey,-1)
--3.5下单(保存用户)sadd orderkey userId
redis.call('sadd',orderKey,userId)
--3.6发送消息到队列,XADD stream.orders * k1 v1 k2 v2
redis.call('xadd','stream.orders','*',"userId",userId,"voucherId",voucherId,"id",orderId)
return 0
代码实现
代码语言:javascript复制package com.hmdp.service.impl;
import cn.hutool.core.bean.BeanUtil;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import static com.hmdp.utils.RedisConstants.*;
/**
* <p>
* 服务实现类
* </p>
*
* @author 虎哥
* @since 2021-12-22
*/
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService iSeckillVoucherService;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisWorker redisWorker;
/**
* 建议放到yml配置文件中
*/
private static final String LUA_FILE_PATH="seckill.lua";
private static final String ORDER = "order";
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT=new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource(LUA_FILE_PATH));
SECKILL_SCRIPT.setResultType(Long.class);
}
/**
* 异步线程
*/
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();
@PostConstruct
public void init(){
//如果指定stream队列关联的消费者组已经存在,则不进行处理
if (!targetGroupExistsInStream(STREAM_QUEUE_NAME,STREAM_GROUP_NAME)) {
//这里createGroup的mkStream为true,表示在创建消费者组时,如果关联的stream队列不存在,也会自动创建
stringRedisTemplate.opsForStream().createGroup(STREAM_QUEUE_NAME,ReadOffset.from("0"),STREAM_GROUP_NAME);
}
//异步监听任务执行
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private boolean targetGroupExistsInStream(String streamName,String groupName) {
//如果stream队列不存在
if(stringRedisTemplate.countExistingKeys(Collections.singletonList(streamName))==0){
return false;
}
StreamInfo.XInfoGroups order_stream_groups = stringRedisTemplate.opsForStream().groups(streamName);
Iterator<StreamInfo.XInfoGroup> iterator = order_stream_groups.iterator();
while(iterator.hasNext()){
StreamInfo.XInfoGroup xInfoGroup = iterator.next();
if(xInfoGroup.groupName().equals(groupName)){
return true;
}
}
return false;
}
public class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while(true){
try {
//1.获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
//消费组的名字和消费者的名字
Consumer.from(STREAM_GROUP_NAME, STREAM_GROUP_CONSUMER_NAME),
StreamReadOptions.empty().count(1)
//我这里设置为一直阻塞,直到有消息可读为止
.block(Duration.ofSeconds(0)),
//从哪个Stream队列进行消息读取,此处读取方式为>
StreamOffset.create(STREAM_QUEUE_NAME, ReadOffset.lastConsumed())
);
if (handleMsgFromStream(list)) {
continue;
}
} catch (Exception e) {
log.error("处理消息异常");
//处理Pending队列中消息
handlePendingList();
}
}
}
private void handlePendingList() {
while(true){
try {
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from(STREAM_GROUP_NAME, STREAM_GROUP_CONSUMER_NAME),
StreamReadOptions.empty().count(1),
//从头开始消费pending队列中所有消息
StreamOffset.create(STREAM_QUEUE_NAME, ReadOffset.from("0"))
);
if(handleMsgFromStream(list)){
//Pending队列中没有错误消息,那么直接退出循环
break;
}
} catch (Exception e) {
//处理Pending队列中的异常消息
//可以在这里做一些异常记录等
try {
//避免循环频率过高
Thread.sleep(3000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
private boolean handleMsgFromStream(List<MapRecord<String, Object, Object>> list) {
//2.判断订单信息是否为空
if(list ==null || list.isEmpty()){
//如果为null,说明没有消息
return true;
}
//解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//3.创建订单
createVoucherOrder(voucherOrder);
//4.确认消息
stringRedisTemplate.opsForStream().acknowledge(STREAM_QUEUE_NAME,STREAM_GROUP_NAME,record.getId());
return false;
}
/**
* 保守起见,还会再次进行判断
*/
private void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
//创建锁对象
RLock lock = redissonClient.getLock(LOCK_ORDER_KEY userId);
//尝试获取分布式锁
// 第一个参数为获取锁的最大等待时间(期间会重试)--默认-1,,失败直接返回
//锁自动释放时间--默认30秒
//时间单位
boolean tryLock = lock.tryLock();
if(!tryLock){
log.error("用户[" userId "]对" voucherId "优惠卷重复抢购");
return;
}
//我们只需要确保下面这两行代码的集群并发问题被解决
try{
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
log.error("用户[" userId "]对" voucherId "优惠卷重复抢购");
return;
}
}finally {
lock.unlock();
}
//6.扣减库存
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
if(!success){
log.error("库存扣减失败");
return;
}
save(voucherOrder);
}
}
@Override
public Result seckillVoucher(Long voucherId) {
Long uid = UserHolder.getUser().getId();
Long orderId = redisWorker.nextId(ORDER);
//1.执行lua脚本
Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString(),orderId.toString());
//2.判断结果是否为0
int r=res.intValue();
if(r!=0){
//说明redis中没有当前优惠卷的库存记录
//去数据库查询该优惠卷是否存在,如果不存在,说明是恶意访问
//如果存在,就更新redis记录
if(r==3){
if(handleUnKnownVoucherId(voucherId)){
seckillVoucher(voucherId);
}else {
return Result.fail("指定优惠卷不存在");
}
}
return Result.fail(r==1?"库存不足":"不能重复下单");
}
//3.为0,有购买资格,把下单信息保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(uid);
voucherOrder.setVoucherId(voucherId);
//4.返回订单id
return Result.ok(orderId);
}
private Boolean handleUnKnownVoucherId(Long voucherId) {
SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
if(voucher==null){
return false;
}
//更新redis记录
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY voucherId,voucher.getStock().toString());
return true;
}
}
总结
本节最重要的地方还是在redis的Stream消息队列,并且也花了大量篇幅去讲解加实践,当然一切还是以官方文档为主,因此建议大家可以没事去看看redis的stream部分文档