我们所了解的Redis分布式锁真的就万无一失吗?

2021-03-09 15:09:36 浏览数 (1)

前言

在单体架构中,我们处理并发的手段有多种,例如synchronized或使用ReentrantLock等常用手段,但是在分布式架构中,上述所说的就不能解决某些业务的并发问题了,那么接下来我们就开始聊聊分布式锁。

什么是分布式锁

在介绍分布式锁之前,我们先由浅入深了了解一下,线程锁和进程锁。

线程锁:

主要用来给方法、代码块加锁。当某个方法或代码在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比如Synchronized、Lock等。

进程锁:

为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁。

分布式锁:

分布式锁,是指在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问。

分布式锁的几种实现方式

  • 数据库乐观锁
  • 使用redis实现分布式锁
  • 使用zookeeper实现分布式锁

为啥要使用分布式锁

在聊聊为啥要使用分布式锁之前,我们先看一下如下图架构模式

上图,是一个集群单体架构模式,我们现在来设想秒杀一个业务场景,首先我们秒杀肯定得有商品,那么我们的商品库存数量是预存放到redis中,然后用户请求的时候,会经过nginx的负载均衡轮询之后,将请求落在其中某一台服务器上,然后执行用户抢购商品业务逻辑,我这里写一段简单演示代码

代码语言:javascript复制
@Overridepublic String panicBuying() {
    Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order"));    if (orderCount > 0) {        int retCount = orderCount - 1;        redisTemplate.opsForValue().set("sunny_order", retCount   "");        System.out.println("库存扣减成功,剩余库存:"   retCount);    } else {        System.out.println("库存不存");        return "库存不足";    }
    return "抢购成功";}

我们分析如上代码,和上图的架构模式,在高并发场景下会不会存在问题?

答案肯定是会的,我们现在是集群单体架构,在高并发场景下,通过ngxin负载轮询,当然可能会有多个请求同时进入方法,然后同一时刻调用redis的api获取库存的值,那么这个时候多个用户获取的库存数量是一样的,然后减掉库存,在设置进去数据肯定就不对了。

好,我们既然知道有bug了,那我们是不是可以使用synchronized锁,在同一时刻只让抢到锁的用户才能操作库存呢,见如下演示代码

代码语言:javascript复制
@Overridepublic String panicBuying() {
    synchronized (this) {        Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order"));        if (orderCount > 0) {            int retCount = orderCount - 1;            redisTemplate.opsForValue().set("sunny_order", retCount   "");            System.out.println("库存扣减成功,剩余库存:"   retCount);        } else {            System.out.println("库存不存");            return "库存不足";        }    }
    return "抢购成功";}

看到如上代码,估计有小伙伴已经看出来有问题了,那我再接着分析下,为什么如上代码加上了锁还是会有问题?

我们结合上面的图仔细想想,synchronized锁只是针对单台的jvm请求有效,但是集群环境下,通过nginx轮询转发,且高并发情况下,肯定会存在多个请求同一时刻将请求分配到两台服务器上,那这个时候就算有synchronized锁,也只能各自锁住各自服务器的jvm请求实例,还是会出现请求获取同样的库存数量,导致数据不对,不过也是稍微解决了大量请求进来的情况

既然我们知道synchronized锁已经无法解决我们的问题了,那我们可以使用redis分布式锁解决呀,在演示如何使用redis解决之前,我先介绍一下redis的setnx命令

对于redis的set命令,相信小伙伴是更加熟悉不过了,那么setNx命令其实跟set命令差不多,他们的区别在于:

  • setNx:如果添加的key不存在,则返回1,如果添加key已经存在了,不会进行操作,返回0
  • set:如果添加的key不存在,则直接添加key,如果存在则进行覆盖之前的value

我们了解完setNx命令,再来看一下通过setNx实现分布式锁的演示代码

代码语言:javascript复制
@Overridepublic String panicBuying() {
    //获取锁    Boolean lockResult = redisTemplate.opsForValue().setIfAbsent("sunny_lock", "sunny");
    if (!lockResult) {        return "锁已被占用,请稍后重试";    }
    Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order"));    if (orderCount > 0) {        int retCount = orderCount - 1;        redisTemplate.opsForValue().set("sunny_order", retCount   "");        System.out.println("库存扣减成功,剩余库存:"   retCount);    } else {        System.out.println("库存不存");        return "库存不足";    }
    //释放锁    redisTemplate.delete("sunny_lock");
    return "抢购成功";}

我们现在使用了Redis分布式锁,多个jvm实例同时setnx,只有一个jvm能够成功,那么就解决了集群下多台服务器锁资源的问题了,就意味着不会出现上面那种多个请求进来,同时减掉我们的库存操作了,那我们在看看还不会有问题呢?

答案是有的,假如我获取到锁之后,在执行业务逻辑的时候发生了报错,导致我无法进行释放锁,那后面的用户就永远无法再次继续抢购商品了对吧。

针对这么个情况,我猜有小伙伴已经想到了解决方案,那我们直接异常捕获下嘛,然后通过finally无论是否报错,都进行执行释放锁操作,看如下演示代码

代码语言:javascript复制
@Overridepublic String panicBuying() {    //获取锁    Boolean lockResult = redisTemplate.opsForValue().setIfAbsent("sunny_lock", "sunny");
    if (!lockResult) {        return "锁已被占用,请稍后重试";    }
    try {        Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order"));        if (orderCount > 0) {            int retCount = orderCount - 1;            redisTemplate.opsForValue().set("sunny_order", retCount   "");            System.out.println("库存扣减成功,剩余库存:"   retCount);        } else {            System.out.println("库存不存");            return "库存不足";        }    } catch (Exception e) {        e.printStackTrace();    } finally {        //释放锁        redisTemplate.delete("sunny_lock");    }
    return "抢购成功";}

机智如我想到了这种办法解决如上这个问题,而此刻有小伙伴要说,你这样处理固然是可以的,但是有没有想过你获取到锁后,然后在执行业务逻辑过程中,服务器发生了宕机了,你的锁又无法释放掉了,那可咋办?估计此时心中一万个emmmm路过吧,不过没办法,既然出现问题我们总得解决是吧。

哼哼,既然你服务器无情且别怪我黎明大大不义了,那我直接给我的锁设置一个过期时间,看你服务器宕机还能不能阻止我释放掉锁,看如下演示代码

代码语言:javascript复制
@Overridepublic String panicBuying() {    //获取锁    Boolean lockResult = redisTemplate.opsForValue().setIfAbsent("sunny_lock", "sunny", 10, TimeUnit.SECONDS);
    if (!lockResult) {        return "锁已被占用,请稍后重试";    }
    try {        Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order"));        if (orderCount > 0) {            int retCount = orderCount - 1;            redisTemplate.opsForValue().set("sunny_order", retCount   "");            System.out.println("库存扣减成功,剩余库存:"   retCount);        } else {            System.out.println("库存不存");            return "库存不足";        }    } catch (Exception e) {        e.printStackTrace();    } finally {        //释放锁        redisTemplate.delete("sunny_lock");    }
    return "抢购成功";}

好啦,我已经将我的锁释的过期时间设置为10秒自动过期了,就算你服务器宕机了我也不怕了,那么此时我们的程序还有bug么?

黎明大大告诉你,还是有的,为啥这么说呢?假如说我第一个请求进来,拿到了锁,本来我执行该业务在5秒内就能执行完,但是莫名奇妙要花15秒时间才能执行完,那我设置的锁自动失效时间,就会将该锁释放掉,第二个请求进来拿到了锁,然后执行业务逻辑,但是还没有执行完,第一个请求执行完业务逻辑了,把锁给释放掉了,那我第二个请求拿到的锁,被第一个请求给释放掉了,接着第三个请求又进来了,执行业务操作还没执行完,第二个请求执行完逻辑,又把锁释放掉了,然后形成了一个闭环操作,在高并发场景下,可能会导致锁长久失效的问题,不知道小伙伴能否get到我说的这个点?如果还没get到点的小伙伴,重复多看几遍就能理解了。

这个时候,黎明大大灵机一动,那很简单啊,我给拿到的锁设置的一个特定的clientId或者随机的值也行,然后在释放锁的时候,获取锁的value,判断一下value是否是我设置的value,如果是的话才

能释放锁,看代码演示(A线程创建锁 被B线程释放掉了,所以这里是解决谁创建的锁,就应该被谁给释放掉)

代码语言:javascript复制
@Overridepublic String panicBuying() {
    String uuid = UUID.randomUUID().toString();
    //获取锁    Boolean lockResult = redisTemplate.opsForValue().setIfAbsent("sunny_lock", uuid, 10, TimeUnit.SECONDS);
    if (!lockResult) {        return "锁已被占用,请稍后重试";    }
    try {        Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order"));        if (orderCount > 0) {            int retCount = orderCount - 1;            redisTemplate.opsForValue().set("sunny_order", retCount   "");            System.out.println("库存扣减成功,剩余库存:"   retCount);        } else {            System.out.println("库存不存");            return "库存不足";        }    } catch (Exception e) {        e.printStackTrace();    } finally {        String uidValue = redisTemplate.opsForValue().get("sunny_lock");        if (uidValue.equals(uuid)) {            //释放锁            redisTemplate.delete("sunny_lock");        }    }
    return "抢购成功";}

额....不过以上解决方案貌似并没有解决我们锁提前过期的问题哦,没关系,黎明大大还有一个思路就是当我们请求进入方法拿到了锁之后,我们此时再额外开一个分线程,然后在这个分线程里面写一个逻辑,该逻辑就是整一个自旋锁,然后在起个定时任务每隔几秒中去获取该锁是否还存在,如果存在则对该锁的过期时间进行续命,也就是加锁的过期时间啦,不过给锁添加过期时间是有讲究的哦,一般情况下是 锁的过期时间 / 3 = 锁续命的时间那么这样就能够解决锁提前失效的问题啦,看代码演示

代码语言:javascript复制
@Overridepublic String panicBuying() {    String uuid = UUID.randomUUID().toString();    String lockName = "sunny_lock";
    boolean lock = lock(lockName,uuid);    if (!lock) {        return "抢占锁失败";    }
    try {        Thread.sleep(15000);        Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order"));        if (orderCount > 0) {            int retCount = orderCount - 1;            redisTemplate.opsForValue().set("sunny_order", retCount   "");            System.out.println("库存扣减成功,剩余库存:"   retCount);        } else {            System.out.println("库存不存");            return "库存不足";        }    } catch (Exception e) {        e.printStackTrace();    } finally {        String uidValue = redisTemplate.opsForValue().get("sunny_lock");        if (uidValue.equals(uuid)) {            //释放锁            System.out.println("释放锁");            redisTemplate.delete("sunny_lock");        }    }
    return "抢购成功";}
//获取锁private boolean lock(String lockKey,String uuid) {    while (true) {        //获取锁        Boolean lockResult = redisTemplate.opsForValue().setIfAbsent("sunny_lock", uuid, 10, TimeUnit.SECONDS);
        if (!lockResult) {            return false;        }
        Thread thread = new Thread(new Runnable() {            @Override            public void run() {                System.out.println("执行~");                Timer timer = new Timer();
                TimerTask timerTask = new TimerTask() {                    @Override                    public void run() {                        //锁存在则将生存时间重置为10s                        String o = (String) redisTemplate.opsForValue().get(lockKey);                        if (uuid.equals(o)) {                            //重新设置时间为10秒                            redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);                        } else {                            timer.cancel();                        }                    }                };                //定时器启动3s后执行第一次,之后每隔3s执行一次                timer.schedule(timerTask, 3000L, 3000L);            }        });
        thread.run();        break;    }    return true;}

解决了以上这些问题,其实在我们平常开发中,完全没有必要再写的这么麻烦了,因为有现成的框架已经帮你集成好这些代码了,甚至还会比我们写的更加严谨,比如redisson框架,该框架我不做多解释了,有不了解的可以自己百度搜寻类似的文章,我这里简单演示一下redisson如何获取锁和释放锁的

代码语言:javascript复制
@Overridepublic String panicBuying() {    String lockName = "sunny_lock";
    RLock lock = redisson.getLock(lockName);    lock.lock();
    try {        Integer orderCount = Integer.valueOf(redisTemplate.opsForValue().get("sunny_order"));        if (orderCount > 0) {            int retCount = orderCount - 1;            redisTemplate.opsForValue().set("sunny_order", retCount   "");            System.out.println("库存扣减成功,剩余库存:"   retCount);        } else {            System.out.println("库存不存");            return "库存不足";        }    } catch (Exception e) {        e.printStackTrace();    } finally {        lock.unlock();    }
    return "抢购成功";}

好啦,redis实现分布式的锁坑基本上都已经踩完了。

我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。

●Redis哨兵架构搭建以及详解

●Redis主从架构的搭建

●深入理解Redis的持久化机制

●Redis集群搭建及原理解剖

0 人点赞