Redis进阶学习04---秒杀优化和消息队列

2022-05-09 14:15:08 浏览数 (1)

Redis进阶学习04---秒杀优化和消息队列

  • 秒杀优化
    • 秒杀优化的具体实现
      • 基于jdk阻塞队列完成的秒杀优化总结
  • Redis消息队列实现秒杀
    • 基于Redis的List实现消息队列
    • 基于Redis的PubSub实现消息队列
    • 基于Stream实现消息队列
    • 基于Stream的消息队列之消费者组
  • Redis-Stream详解
    • 追加新消息,XADD,生产消息
    • 从消息队列中获取消息,XREAD,消费消息
    • 消息ID说明
    • 消费者组模式,consumer group
    • Pending 等待列表
    • 消息转移
    • 坏消息问题,Dead Letter,死信问题
    • 信息监控,XINFO
    • 命令小结
  • JAVA伪代码实现Stream消费队列
    • Stream消息队列小结
  • 综上比较
  • 使用Stream作为消息队列优化之前的秒杀案例
  • 总结

秒杀优化

如果一个饭店只有一个服务员,并且这个服务员不仅需要负责客人的点餐服务,还需要负责炒菜服务,显然这样的话,只能是先处理完第一个客人所有的点餐,烧菜任务后,才能去处理下一个客人的点餐,烧菜任务,这样显然把任务给串行化了,效率大大降低。

而现在我们就面临这样的问题:

目前整个秒杀的过程都是串行化执行的,并且这个流程里面涉及多次数据库查询操作,数据库查询是最耗费时间的,因此优化的思路就是把最耗费时间的数据库写操作转换为异步执行,然后把数据库查询操作通过redis查询替换掉,这样整体就分为了两部分,一部分是主线程去redis判断校验,然后如果判断和校验都通过了,就将消息放入一个队列中,异步线程从该队列中取出消息,然后去执行数据库写操作。

此时redis就相当于服务员,负责库存数量判断和重复购买校验,然后将合法的订单交易,放入队列中,异步处理线程,从队列读取消息,进行数据库写处理,即扣减库存,创建订单的耗时逻辑,全部异步完成。


显然,关于redis那部分判断逻辑,应该都由lua脚本来完成,而非java代码


秒杀优化的具体实现

1.新增优惠卷的同时,将优惠卷信息保存到Redis中

代码语言:javascript复制
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        //保存优惠卷信息到Redis
        stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY voucher.getId(),voucher.getStock().toString());
    }

测试:

2.lua脚本编写

代码语言:javascript复制
-- 1.参数列表
-- 1.1 优惠卷id
local voucherId= ARGV[1]
-- 1.2 用户id
local userId= ARGV[2]

--2.数据key
--2.1库存key
local storeKey="seckill:stock:" .. voucherId
--2.2订单key
local orderKey="seckill:order:" .. voucherId

--3.脚本业务
--3.1判断库存是否充足 get storeKey
if(tonumber(redis.call('get',storeKey))<=0) then
    --3.2库存不足,返回1
    return 1
end

--3.2判断用户是否下单--set集合的判断方法,判断某个集合中是否存在某个value 
if(redis.call('sismember',orderKey,userId)==1) then
      --3.3存在,说明是重复下单,返回2
    return 2
end

--3.4扣库存incrby storeKey -1
redis.call('incrby',storeKey,-1)
--3.5下单(保存用户)sadd orderkey userId
redis.call('sadd',orderKey,userId)
return 0

3.修改抢购逻辑

代码语言:javascript复制
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService iSeckillVoucherService;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private RedisWorker redisWorker;

     private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

     static {
         SECKILL_SCRIPT=new DefaultRedisScript<>();
         SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
         SECKILL_SCRIPT.setResultType(Long.class);
     }

    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {
        Long uid = UserHolder.getUser().getId();
        //1.执行lua脚本
        Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString());
        //2.判断结果是否为0
        int r=res.intValue();
        if(r!=0){
            return Result.fail(r==1?"库存不足":"不能重复下单");
        }
         //3.为0,有购买资格,把下单信息保存到阻塞队列
        long order = redisWorker.nextId("order");
        //TODO:保存到阻塞队列
        //4.返回订单id
        return Result.ok(order);
    }
}

当我们测试一下后:

此时数据库并无变化,因为我们还没把消息放入阻塞队列,从而通知异步线程去处理


4.异步线程处理阻塞队列中的消息

代码语言:javascript复制
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService iSeckillVoucherService;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private RedisWorker redisWorker;
    

     private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

     static {
         SECKILL_SCRIPT=new DefaultRedisScript<>();
         SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
         SECKILL_SCRIPT.setResultType(Long.class);
     }


    /**
     * 阻塞队列
     */
    private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue(1024*1024);

    /**
     * 异步线程
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();

     @PostConstruct
     public void init(){
         SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
     }

    public class VoucherOrderHandler implements Runnable{
        @Override
        public void run() {
          while(true){
              //1.获取队列中的订单信息
              try {
                  //1.获取队列中的订单信息
                  VoucherOrder voucherOrder = orderTasks.take();
                  //2.创建订单
                  createVoucherOrder(voucherOrder);
              } catch (InterruptedException e) {
                  log.error("订单创建异常: ",e);
              }
          }
        }

        /**
         * 保守起见,还会再次进行判断
         */
        private void createVoucherOrder(VoucherOrder voucherOrder) {
            Long userId = voucherOrder.getUserId();
            Long voucherId = voucherOrder.getVoucherId();
            //创建锁对象
            RLock lock = redissonClient.getLock("lock:order:"   userId);
            //尝试获取分布式锁
            // 第一个参数为获取锁的最大等待时间(期间会重试)--默认-1,,失败直接返回
            //锁自动释放时间--默认30秒
            //时间单位
            boolean tryLock = lock.tryLock();
            if(!tryLock){
                log.error("用户[" userId "]对" voucherId "优惠卷重复抢购");
            }
            //我们只需要确保下面这两行代码的集群并发问题被解决
            try{
                Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();

                if(count>0){
                    log.error("用户[" userId "]对" voucherId "优惠卷重复抢购");
                }
            }finally {
                lock.unlock();
            }

            //6.扣减库存
            boolean success = iSeckillVoucherService.update()
                    .setSql("stock = stock -1")
                    .eq("voucher_id", voucherId)
                    .gt("stock",0)
                    .update();
            if(!success){
               log.error("库存扣减失败");
            }
            save(voucherOrder);
        }
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        Long uid = UserHolder.getUser().getId();
        //1.执行lua脚本
        Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString());
        //2.判断结果是否为0
        int r=res.intValue();
        if(r!=0){
            return Result.fail(r==1?"库存不足":"不能重复下单");
        }
        //3.为0,有购买资格,把下单信息保存到阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        long orderId = redisWorker.nextId("order");
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(uid);
        voucherOrder.setVoucherId(voucherId);
        orderTasks.add(voucherOrder);
        //4.返回订单id
        return Result.ok(orderId);
    }
}

基于jdk阻塞队列完成的秒杀优化总结

阻塞队列里面数据过多可能会导致jvm内存溢出,还有就是即便设置了阻塞队列最大元素个数上限也有弊端,就是如果元素过多,处理速度跟不上,会导致很多额外任务放入阻塞队列失败

还有就是数据都是存放在内存中的,一旦java程序出现异常,那么内存中的任务将会全部丢失,并且一旦出现异常,也会导致某个任务执行失败


Redis消息队列实现秒杀


基于Redis的List实现消息队列


基于Redis的PubSub实现消息队列


基于Stream实现消息队列

默认是非阻塞的,并且如果阻塞时长传入0,表示无限等待


基于Stream的消息队列之消费者组


Redis-Stream详解

相信各位光看上面的介绍,应该对Stream还是一知半解,下面我来详细介绍一下它的用法:

追加新消息,XADD,生产消息

XADD,命令用于在某个stream(流数据)中追加消息,演示如下:

代码语言:javascript复制
127.0.0.1:6379> XADD memberMessage * user kang msg Hello
"1651325244694-0"
127.0.0.1:6379> XADD memberMessage * user zhong  msg nihao
"1651325256282-0"

其中语法格式为:

代码语言:javascript复制
XADD key ID field string [field string …]

需要提供key,消息ID方案,消息内容,其中消息内容为key-value型数据。

ID,最常使用*,表示由Redis生成消息ID,这也是强烈建议的方案。

field string [field string], 就是当前消息内容,由1个或多个key-value构成。

上面的例子中,在memberMemsages这个key中追加了user kang msg Hello这个消息。Redis使用毫秒时间戳和序号生成了消息ID。此


从消息队列中获取消息,XREAD,消费消息

XREAD,从Stream中读取消息,演示如下:

代码语言:javascript复制
127.0.0.1:6379> XREAD streams memberMessage 0
1) 1) "memberMessage"
   2) 1) 1) "1651325244694-0"
         2) 1) "user"
            2) "kang"
            3) "msg"
            4) "Hello"
      2) 1) "1651325256282-0"
         2) 1) "user"
            2) "zhong"
            3) "msg"
            4) "nihao"

消息被读取后,并不会从stream队列中消失,这点需要注意

上面的命令是从消息队列memberMessage中读取所有消息。XREAD支持很多参数,语法格式为:

代码语言:javascript复制
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

其中:

  • [COUNT count],用于限定获取的消息数量
  • [BLOCK milliseconds],用于设置XREAD为阻塞模式,默认为非阻塞模式
  • ID,用于设置由哪个消息ID开始读取。使用0表示从第一条消息开始。(本例中就是使用0)此处需要注意,消息队列ID是单调递增的,所以通过设置起点,可以向后读取。在阻塞模式中,可以使用 $ 表 示 最 新 的 消 息 I D 。 ( 在 非 阻 塞 模 式 下,表示最新的消息ID)。(在非阻塞模式下无意义)。

XRED读消息时分为阻塞和非阻塞模式,使用BLOCK选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。

一个典型的阻塞模式用法为:

代码语言:javascript复制
127.0.0.1:6379> XREAD block 1000 streams memberMessage $
(nil)
(1.07s)

我们使用Block模式,配合$作为ID,表示读取最新的消息,若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。

因此,典型的队列就是 XADD 配合 XREAD Block 完成。XADD负责生成消息,XREAD负责消费消息。


消息ID说明

XADD生成的1553439850328-0,就是Redis生成的消息ID,由两部分组成:时间戳-序号。时间戳是毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个64位整型。较真来说,序号可能会溢出,but真可能吗?

可以通过multi批处理,来验证序号的递增:

代码语言:javascript复制
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> XADD memberMessage * msg one
QUEUED
127.0.0.1:6379> XADD memberMessage * msg two
QUEUED
127.0.0.1:6379> XADD memberMessage * msg three
QUEUED
127.0.0.1:6379> XADD memberMessage * msg four
QUEUED
127.0.0.1:6379> XADD memberMessage * msg five
QUEUED
127.0.0.1:6379> EXEC
1) "1553441006884-0"
2) "1553441006884-1"
3) "1553441006884-2"
4) "1553441006884-3"
5) "1553441006884-4"

由于一个redis命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。

为了保证消息是有序的,因此Redis生成的ID是单调递增有序的。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latest_generated_id属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latest_generated_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。

强烈建议使用Redis的方案生成消息ID,因为这种时间戳 序号的单调递增的ID方案,几乎可以满足你全部的需求。但同时,记住ID是支持自定义的,别忘了!


消费者组模式,consumer group

当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有10条消息,三个消费者都可以消费到这10条消息。

但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有10条消息,三个消费者分别消费其中的某些消息,比如消费者A消费消息1、2、5、8,消费者B消费消息4、9、10,而消费者C消费消息3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。

即消费者组模式可以让多个消费者协同合作,来共同消息队列中的消息,提高队列中消息的消费效率

消费者组模式的支持主要由两个命令实现:

  • XGROUP,用于管理消费者组,提供创建组,销毁组,更新组起始消息ID等操作
  • XREADGROUP,分组消费消息操作

进行演示,演示时使用5个消息,思路是:创建一个Stream消息队列,生产者生成5条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:

代码语言:javascript复制
# 生产者生成10条消息
127.0.0.1:6379> MULTI
127.0.0.1:6379> XADD mq * msg 1 # 生成一个消息:msg 1
127.0.0.1:6379> XADD mq * msg 2
127.0.0.1:6379> XADD mq * msg 3
127.0.0.1:6379> XADD mq * msg 4
127.0.0.1:6379> XADD mq * msg 5
127.0.0.1:6379> EXEC
 1) "1553585533795-0"
 2) "1553585533795-1"
 3) "1553585533795-2"
 4) "1553585533795-3"
 5) "1553585533795-4"

# 创建消费组 mqGroup
127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 # 为消息队列 mq 创建消费组 mgGroup
OK

# 消费者A,消费第1条
127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq > #消费组内消费者A,从消息队列mq中读取一个消息
1) 1) "mq"
   2) 1) 1) "1553585533795-0"
         2) 1) "msg"
            2) "1"
# 消费者A,消费第2条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-1"
         2) 1) "msg"
            2) "2"
# 消费者B,消费第3条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-2"
         2) 1) "msg"
            2) "3"
# 消费者A,消费第4条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA count 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-3"
         2) 1) "msg"
            2) "4"
# 消费者C,消费第5条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerC COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-4"
         2) 1) "msg"
            2) "5"

上面的例子中,三个在同一组 mpGroup 消费者A、B、C在消费消息时(消费者在消费时指定即可,不用预先创建),有着互斥原则,消费方案为,A->1, A->2, B->3, A->4, C->5。语法说明为:

XGROUP CREATE mq mqGroup 0,用于在消息队列mq上创建消费组 mpGroup,最后一个参数0,表示该组从第一条消息开始消费。(意义与XREAD的0一致)。除了支持CREATE外,还支持SETID设置起始ID,DESTROY销毁组,DELCONSUMER删除组内消费者等操作。

XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >,用于组mqGroup内消费者consumerA在队列mq中消费,参数>表示未被组内消费的起始消息,参数count 1表示获取一条。语法与XREAD基本一致,不过是增加了组的概念。

可以进行组内消费的基本原理是,STREAM类型会为每个组记录一个最后处理(交付)的消息ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。

以上就是消费组的基础操作。除此之外,消费组消费时,还有一个必须要考虑的问题,就是若某个消费者,消费了某条消息,但是并没有处理成功时(例如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了。下面继续讨论解决方案。


Pending 等待列表

为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。演示如下:

代码语言:javascript复制
127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
1) (integer) 5 # 5个已读取但未处理的消息
2) "1553585533795-0" # 起始ID
3) "1553585533795-4" # 结束ID
4) 1) 1) "consumerA" # 消费者A有3个
      2) "3"
   2) 1) "consumerB" # 消费者B有1个
      2) "1"
   3) 1) "consumerC" # 消费者C有1个
      2) "1"

127.0.0.1:6379> XPENDING mq mqGroup -   10 # 使用 start end count 选项可以获取详细信息
1) 1) "1553585533795-0" # 消息ID
   2) "consumerA" # 消费者
   3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE
   4) (integer) 5 # 消息被读取了5次,delivery counter
2) 1) "1553585533795-1"
   2) "consumerA"
   3) (integer) 1654355
   4) (integer) 4
# 共5个,余下3个省略 ...

127.0.0.1:6379> XPENDING mq mqGroup -   10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
1) 1) "1553585533795-0"
   2) "consumerA"
   3) (integer) 1641083
   4) (integer) 5
# 共3个,余下2个省略 ...

每个Pending的消息有4个属性:

  • 消息ID
  • 所属消费者
  • IDLE,已读取时长
  • delivery counter,消息被读取次数

上面的结果我们可以看到,我们之前读取的消息,都被记录在Pending列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令 XACK 完成告知消息处理完成,演示如下:

代码语言:javascript复制
127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
(integer) 1

127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
1) (integer) 4 # 已读取但未处理的消息已经变为4个
2) "1553585533795-1"
3) "1553585533795-4"
4) 1) 1) "consumerA" # 消费者A,还有2个消息处理
      2) "2"
   2) 1) "consumerB"
      2) "1"
   3) 1) "consumerC"
      2) "1"
127.0.0.1:6379> 

有了这样一个Pending机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该Pending列表,就可以继续处理该消息了,保证消息的有序和不丢失。

此时还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者Pending的消息,转义给其他的消费者处理,就是消息转移。请继续。


消息转移

消息转移的操作时将某个消息转移到自己的Pending列表中。使用语法XCLAIM来实现,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:

代码语言:javascript复制
# 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
127.0.0.1:6379> XPENDING mq mqGroup -   10
1) 1) "1553585533795-1"
   2) "consumerA"
   3) (integer) 15907787
   4) (integer) 4

# 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
1) 1) "1553585533795-1"
   2) 1) "msg"
      2) "2"

# 消息1553585533795-1已经转移到消费者B的Pending中。
127.0.0.1:6379> XPENDING mq mqGroup -   10
1) 1) "1553585533795-1"
   2) "consumerB"
   3) (integer) 84404 # 注意IDLE,被重置了
   4) (integer) 5 # 注意,读取次数也累加了1次

以上代码,完成了一次消息转移。转移除了要指定ID外,还需要指定IDLE,保证是长时间未处理的才被转移。被转移的消息的IDLE会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了IDLE,则可以避免后面的转移不会成功,因为IDLE不满足条件。例如下面的连续两条转移,第二条不会成功。

代码语言:javascript复制
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1

这就是消息转移。至此我们使用了一个Pending消息的ID,所属消费者和IDLE的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。这个属性主要用在判定是否为错误数据上。请继续看:


坏消息问题,Dead Letter,死信问题

正如上面所说,如果某个消息,不能被消费者处理,也就是不能被XACK,这是要长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter就会累加(上一节的例子可以看到),当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用XDEL语法,演示如下:

代码语言:javascript复制
# 删除队列中的消息
127.0.0.1:6379> XDEL mq 1553585533795-1
(integer) 1
# 查看队列中再无此消息
127.0.0.1:6379> XRANGE mq -  
1) 1) "1553585533795-0"
   2) 1) "msg"
      2) "1"
2) 1) "1553585533795-2"
   2) 1) "msg"
      2) "3"

注意本例中,并没有删除Pending中的消息因此你查看Pending,消息还会在。可以执行XACK标识其处理完毕!


信息监控,XINFO

Stream提供了XINFO来实现对服务器信息的监控,可以查询:

查看队列信息

代码语言:javascript复制
127.0.0.1:6379> Xinfo stream mq
 1) "length"
 2) (integer) 7
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 1
 9) "last-generated-id"
10) "1553585533795-9"
11) "first-entry"
12) 1) "1553585533795-3"
    2) 1) "msg"
       2) "4"
13) "last-entry"
14) 1) "1553585533795-9"
    2) 1) "msg"
       2) "10"

消费组信息

代码语言:javascript复制
127.0.0.1:6379> Xinfo groups mq
1) 1) "name"
   2) "mqGroup"
   3) "consumers"
   4) (integer) 3
   5) "pending"
   6) (integer) 3
   7) "last-delivered-id"
   8) "1553585533795-4"

消费者组成员信息

代码语言:javascript复制
127.0.0.1:6379> XINFO CONSUMERS mq mqGroup
1) 1) "name"
   2) "consumerA"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 18949894
2) 1) "name"
   2) "consumerB"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 3092719
3) 1) "name"
   2) "consumerC"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 23683256

至此,消息队列的操作说明大体结束!


命令小结

  • XACK 结束Pending
  • XADD 生成消息
  • XCLAIM 消息转移
  • XDEL 删除消息
  • XGROUP 消费组管理
  • XINFO 得到消费组信息
  • XLEN 消息队列长度
  • XPENDING Pending列表
  • XRANGE 获取消息队列中消息
  • XREAD 消费消息
  • XREADGROUP 分组消费消息
  • XREVRANGE 逆序获取消息队列中消息
  • XTRIM 消息队列容量

JAVA伪代码实现Stream消费队列


Stream消息队列小结


综上比较


使用Stream作为消息队列优化之前的秒杀案例

lua脚本改造

代码语言:javascript复制
-- 1.参数列表
-- 1.1 优惠卷id
local voucherId= ARGV[1]
-- 1.2 用户id
local userId= ARGV[2]
--1.3订单id
local orderId=ARGV[3]

--2.数据key
--2.1库存key
local storeKey="seckill:stock:" .. voucherId
--2.2订单key
local orderKey="seckill:order:" .. voucherId

--3.脚本业务
--3.1判断库存是否充足 get storeKey
local storeNum=redis.call('get',storeKey)
--3.2 如果redis中没有该优惠卷库存记录,返回3
-- lua中只有false和nil是假值, 其他都是真值
if(storeNum) then
else
    return 3
end
if(tonumber(storeNum)<=0) then
    --3.2库存不足,返回1
    return 1
end

--3.2判断用户是否下单--set集合的判断方法,判断某个集合中是否存在某个value
if(redis.call('sismember',orderKey,userId)==1) then
      --3.3存在,说明是重复下单,返回2
    return 2
end

--3.4扣库存incrby storeKey -1
redis.call('incrby',storeKey,-1)
--3.5下单(保存用户)sadd orderkey userId
redis.call('sadd',orderKey,userId)
--3.6发送消息到队列,XADD stream.orders * k1 v1 k2 v2
redis.call('xadd','stream.orders','*',"userId",userId,"voucherId",voucherId,"id",orderId)
return 0

代码实现

代码语言:javascript复制
package com.hmdp.service.impl;

import cn.hutool.core.bean.BeanUtil;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

import static com.hmdp.utils.RedisConstants.*;

/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author 虎哥
 * @since 2021-12-22
 */
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService iSeckillVoucherService;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private RedisWorker redisWorker;

    /**
     * 建议放到yml配置文件中
     */
    private static final String LUA_FILE_PATH="seckill.lua";

    private static final String ORDER = "order";

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

     static {
         SECKILL_SCRIPT=new DefaultRedisScript<>();
         SECKILL_SCRIPT.setLocation(new ClassPathResource(LUA_FILE_PATH));
         SECKILL_SCRIPT.setResultType(Long.class);
     }

    /**
     * 异步线程
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();

    @PostConstruct
     public void init(){
        //如果指定stream队列关联的消费者组已经存在,则不进行处理
        if (!targetGroupExistsInStream(STREAM_QUEUE_NAME,STREAM_GROUP_NAME)) {
         //这里createGroup的mkStream为true,表示在创建消费者组时,如果关联的stream队列不存在,也会自动创建
        stringRedisTemplate.opsForStream().createGroup(STREAM_QUEUE_NAME,ReadOffset.from("0"),STREAM_GROUP_NAME);
        }
        //异步监听任务执行
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
     }

    private boolean targetGroupExistsInStream(String streamName,String groupName) {
          //如果stream队列不存在
        if(stringRedisTemplate.countExistingKeys(Collections.singletonList(streamName))==0){
         return false;
        }
        StreamInfo.XInfoGroups order_stream_groups = stringRedisTemplate.opsForStream().groups(streamName);
        Iterator<StreamInfo.XInfoGroup> iterator = order_stream_groups.iterator();
        while(iterator.hasNext()){
            StreamInfo.XInfoGroup xInfoGroup = iterator.next();
            if(xInfoGroup.groupName().equals(groupName)){
                return true;
            }
        }
        return false;
    }

    public class VoucherOrderHandler implements Runnable{
        @Override
        public void run() {
          while(true){
              try {
                  //1.获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                  List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                          //消费组的名字和消费者的名字
                          Consumer.from(STREAM_GROUP_NAME, STREAM_GROUP_CONSUMER_NAME),
                          StreamReadOptions.empty().count(1)
                                  //我这里设置为一直阻塞,直到有消息可读为止
                                  .block(Duration.ofSeconds(0)),
                          //从哪个Stream队列进行消息读取,此处读取方式为>
                          StreamOffset.create(STREAM_QUEUE_NAME, ReadOffset.lastConsumed())
                  );
                  if (handleMsgFromStream(list)) {
                      continue;
                  }
              } catch (Exception e) {
                  log.error("处理消息异常");
                  //处理Pending队列中消息
                  handlePendingList();
              }
          }
        }

        private void handlePendingList() {
            while(true){
                try {
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from(STREAM_GROUP_NAME, STREAM_GROUP_CONSUMER_NAME),
                            StreamReadOptions.empty().count(1),
                            //从头开始消费pending队列中所有消息
                            StreamOffset.create(STREAM_QUEUE_NAME, ReadOffset.from("0"))
                    );
                    if(handleMsgFromStream(list)){
                        //Pending队列中没有错误消息,那么直接退出循环
                        break;
                    }
                } catch (Exception e) {
                    //处理Pending队列中的异常消息
                    //可以在这里做一些异常记录等
                    try {
                        //避免循环频率过高
                        Thread.sleep(3000);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }


        private boolean handleMsgFromStream(List<MapRecord<String, Object, Object>> list) {
            //2.判断订单信息是否为空
            if(list ==null || list.isEmpty()){
                //如果为null,说明没有消息
                return true;
            }
            //解析消息
            MapRecord<String, Object, Object> record = list.get(0);
            Map<Object, Object> value = record.getValue();
            VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
            //3.创建订单
            createVoucherOrder(voucherOrder);
            //4.确认消息
            stringRedisTemplate.opsForStream().acknowledge(STREAM_QUEUE_NAME,STREAM_GROUP_NAME,record.getId());
            return false;
        }

        /**
         * 保守起见,还会再次进行判断
         */
        private void createVoucherOrder(VoucherOrder voucherOrder) {
            Long userId = voucherOrder.getUserId();
            Long voucherId = voucherOrder.getVoucherId();
            //创建锁对象
            RLock lock = redissonClient.getLock(LOCK_ORDER_KEY   userId);
            //尝试获取分布式锁
            // 第一个参数为获取锁的最大等待时间(期间会重试)--默认-1,,失败直接返回
            //锁自动释放时间--默认30秒
            //时间单位
            boolean tryLock = lock.tryLock();
            if(!tryLock){
                log.error("用户[" userId "]对" voucherId "优惠卷重复抢购");
                return;
            }
            //我们只需要确保下面这两行代码的集群并发问题被解决
            try{
                Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();

                if(count>0){
                    log.error("用户[" userId "]对" voucherId "优惠卷重复抢购");
                    return;
                }
            }finally {
                lock.unlock();
            }

            //6.扣减库存
            boolean success = iSeckillVoucherService.update()
                    .setSql("stock = stock -1")
                    .eq("voucher_id", voucherId)
                    .gt("stock",0)
                    .update();
            if(!success){
               log.error("库存扣减失败");
               return;
            }
            save(voucherOrder);
        }
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        Long uid = UserHolder.getUser().getId();
        Long orderId = redisWorker.nextId(ORDER);
        //1.执行lua脚本
        Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString(),orderId.toString());
        //2.判断结果是否为0
        int r=res.intValue();
        if(r!=0){
            //说明redis中没有当前优惠卷的库存记录
            //去数据库查询该优惠卷是否存在,如果不存在,说明是恶意访问
            //如果存在,就更新redis记录
            if(r==3){
                if(handleUnKnownVoucherId(voucherId)){
                    seckillVoucher(voucherId);
                }else {
                    return Result.fail("指定优惠卷不存在");
                }
            }
            return Result.fail(r==1?"库存不足":"不能重复下单");
        }
        //3.为0,有购买资格,把下单信息保存到阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(uid);
        voucherOrder.setVoucherId(voucherId);
        //4.返回订单id
        return Result.ok(orderId);
    }

    private Boolean handleUnKnownVoucherId(Long voucherId) {
        SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
        if(voucher==null){
            return false;
        }
        //更新redis记录
        stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY voucherId,voucher.getStock().toString());
       return true;
    }
}

总结

本节最重要的地方还是在redis的Stream消息队列,并且也花了大量篇幅去讲解加实践,当然一切还是以官方文档为主,因此建议大家可以没事去看看redis的stream部分文档

0 人点赞