基于redis实现的分布式锁

2023-12-05 21:03:17 浏览数 (1)

基本实现

借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发 送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。

添加描述

  • 1. 多个客户端同时获取锁(setnx)
  • 2. 获取成功,执行业务逻辑,执行完成释放锁(del)
  • 3. 其他客户端等待重试

改造StockService方法:

代码语言:javascript复制
@Service
public class StockService {
    @Autowired
    private StockMapper stockMapper;
    @Autowired
    private LockMapper lockMapper;
    @Autowired
    private StringRedisTemplate redisTemplate;
    public void checkAndLock() {
        // 加锁,获取锁失败重试
        while (!this.redisTemplate.opsForValue().setIfAbsent("lock", 

"xxx")){
            try {
                Thread.sleep(100);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
       }
        // 先查询库存是否充足
        Stock stock = this.stockMapper.selectById(1L);
        // 再减库存
        if (stock != null && stock.getCount() > 0){
            stock.setCount(stock.getCount() - 1);
            this.stockMapper.updateById(stock);
       }
        // 释放锁
        this.redisTemplate.delete("lock");
   }
}

其中,加锁:

代码语言:javascript复制
// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "xxx")){
    try {
        Thread.sleep(100);
   } catch (InterruptedException e) {
        e.printStackTrace();
   }
}

解锁:

代码语言:javascript复制
// 释放锁

this.redisTemplate.delete("lock");

使用Jmeter压力测试如下:

 查看mysql数据库:

防死锁

 解决:给锁设置过期时间,自动释放锁。 设置过期时间两种方式:

1. 通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

2. 使用set指令设置过期时间:set key value ex 3 nx(既达到setnx的效果,又设置了过期时间)

压力测试肯定也没有问题。

问题:可能会释放其他服务器的锁。 场景:如果业务逻辑的执行时间是7s。执行流程如下

1. index1业务逻辑没执行完,3秒后锁被自动释放。

2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。

3. index3获取到锁,执行业务逻辑

4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只 执行1s就被别人释放。 最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的 锁

防误删

实现如下:

问题:删除操作缺乏原子性。 场景:

1. index1执行删除时,查询到的lock值确实和uuid相等

2. index1执行删除前,lock刚好过期时间已到,被redis自动释放

3. index2获取了lock 4. index1执行删除,此时会把index2的lock删除

解决方案:没有一个命令可以同时做到判断 删除,所有只能通过其他方式实现(LUA脚本)

使用lua保证删除原子性

删除LUA脚本:

代码语言:javascript复制
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', 
KEYS[1]) else return 0 end

代码实现:

代码语言:javascript复制
public void checkAndLock() {
    // 加锁,获取锁失败重试
    String uuid = UUID.randomUUID().toString();
    while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, 

TimeUnit.SECONDS)){
        try {
            Thread.sleep(50);
       } catch (InterruptedException e) {
            e.printStackTrace();
       }
   }
    // 先查询库存是否充足
    Stock stock = this.stockMapper.selectById(1L);
    // 再减库存
    if (stock != null && stock.getCount() > 0){
        stock.setCount(stock.getCount() - 1);
        this.stockMapper.updateById(stock);
   }
    // 释放锁
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return 
redis.call('del', KEYS[1]) else return 0 end";
    this.redisTemplate.execute(new DefaultRedisScript<>(script, 

Long.class), Arrays.asList("lock"), uuid);
}

 压力测试:

可重入锁 

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加 锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代 码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继 续往下执行。

用一段 Java 代码解释可重入:

代码语言:javascript复制
public synchronized void a() {
    b();
}

public synchronized void b() {
    // pass
}

假设 X 线程在 a 方法获取锁之后,继续执行 b 方法,如果此时不可重入,线程就必须等待锁释放,再次争抢锁。

锁明明是被 X 线程拥有,却还需要等待自己释放锁,然后再去抢锁,这看起来就很奇怪,我释放我自己~

可重入性就可以解决这个尴尬的问题,当线程拥有锁之后,往后再遇到加锁方法,直接将加锁次数加 1,然后再执行方法逻辑。退出加锁方法之后,加锁次数再减 1,当加锁次数为 0 时,锁才被真正的释 放。 可以看到可重入锁最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。

解决方案:redis Hash

加锁脚本

Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的 锁的重入次数,然后利用 lua 脚本判断逻辑。

代码语言:javascript复制
if (redis.call('exists', KEYS[1]) == 0 or 
    redis.call('hexists', KEYS[1], ARGV[1]) == 1) 
then
    redis.call('hincrby', KEYS[1], ARGV[1], 1);
    redis.call('expire', KEYS[1], ARGV[2]);
    return 1;
else
 return 0;
end

假设值为:KEYS:[lock], ARGV[uuid, expire]如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次 数加1。

解锁脚本
代码语言:javascript复制
-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减 1
-- 如果为 1 代表 该可重入 key 解锁成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then 
    return nil; 
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then 
    return 0; 
else 
    redis.call('del', KEYS[1]); 
    return 1; 
end;

这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中,三个返回值含义如下:

1 代表解锁成功,锁被释放

0 代表可重入次数被减 1

null 代表其他线程尝试解锁,解锁失败

代码实现

由于加解锁代码量相对较多,这里可以封装成一个工具类:

 具体实现:

代码语言:javascript复制
public class RedisDistributeLock{


    private StringRedisTemplate redisTemplate;

    //线程局部变量,可以在线程内共享参数
    private String lockName;
    private String static uuid;
    private Integer expire = 30;
    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = THREAD_LOCAL.get();
        if (StringUtils.isBlank(uuid)) {
            this.uuid = UUID.randomUUID().toString();
            THREAD_LOCAL.set(uuid);
        }
    }

    public void lock() {
        this.lock(expire);
    }

    public void lock(Integer expire) {
        this.expire = expire;
        String script = "if (redis.call('exists', KEYS[1]) == 0 or"  
                "redis.call('hexists', KEYS[1], ARGV[1]) == 1)"  
                "        then"  
                "  redis.call('hincrby', KEYS[1], ARGV[1], 1);"  
                "  redis.call('expire', KEYS[1], ARGV[2]);"  
                "  return 1;"  
                "else"  
                "        return 0;"  
                " end";
        if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class),
                Arrays.asList(lockName), uuid, expire.toString())) {
            try {
                Thread.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //没有获取到锁重试
            lock(expire);
        }
    }

    public void unlock() {
        String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then"  
                "  return nil; "  
                "elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then"  
                "  return 0; "  
                "else"  
                "  redis.call('del', KEYS[1]);"  
                "  return 1;"  
                "end;";
        //如果返回值没有使用Boolean,Spring-data-redis 进行类型转换时将会把 null
        //转为 false,这就会影响我们逻辑判断
        //所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功。
        Long result = this.redisTemplate.execute(new DefaultRedisScript<>
                (script, Long.class), Arrays.asList(lockName), uuid);
        // 如果未返回值,代表尝试解其他线程的锁
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: "   lockName   " with request: "   uuid);
        } else if (result == 1) {
            THREAD_LOCAL.remove();
        }
    }

}
使用及测试 

在业务代码中使用:

代码语言:javascript复制
public void checkAndLock() {
    // 加锁,获取锁失败重试
    RedisDistributeLock lock = new RedisDistributeLock(this.redisTemplate, 
"lock");
    lock.lock();
    // 先查询库存是否充足
    Stock stock = this.stockMapper.selectById(1L);
    // 再减库存
    if (stock != null && stock.getCount() > 0){
        stock.setCount(stock.getCount() - 1);
        this.stockMapper.updateById(stock);
   }
    // this.testSubLock();
    // 释放锁
    lock.unlock();
}

 测试:

 测试可重入性:

 自动续期

lua脚本:

代码语言:javascript复制
if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then 
    redis.call('expire', KEYS[1], ARGV[2]); 
    return 1; 
else 
    return 0; 
end

 在RedisDistributeLock中添加renewExpire方法:

代码语言:javascript复制
    private static final Timer TIMER = new Timer();

    /**
     * 开启定时器,自动续期
     */
    private void renewExpire() {
        String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then "  
                "redis.call('expire', KEYS[1], ARGV[2]); "  
                "return 1; "  
                "else "  
                "return 0; end";
        TIMER.schedule(new TimerTask() {
            @Override
            public void run() {
                //如果uuid为空,则终止定时任务
                if (StringUtils.isNotBlank(uuid)) {
                    redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class),
                            Arrays.asList(lockName), RedisDistributeLock.this.uuid,
                            expire.toString());
                    renewExpire();
                }
            }
        },expire * 1000 / 3);
    }

 在lock方法中使用:

 在unlock方法中添加红框中的代码:

0 人点赞