有 Redis 的面试恐怕没有不问分布式锁的吧。..
分布式锁有着多种多样的实现方式,今天就来介绍一下 如何用 Redis 实现一个分布式锁。
目录
- 定义
- 实现思路
- Java 实现代码
- Redisson 的分布式锁实现
- org.redisson.RedissonLock#lock() 方法
- org.redisson.RedissonLock#unlock() 方法
- RedLock
- 总结
- 参考文章
定义
首先,什么是分布式锁呢?先上维基百科的定义:
分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
通俗的理解就是,在本地应用中,当我们多个线程需要并发的访问某一个资源时,我们直接使用本地的锁,比如 Java 的 synchronized 和 ReentrantLock.
但是当我们的系统是分布式系统,竞争资源的是不同进程里的不同线程,本地锁就没有用了。此时就需要分布式锁。
举个简单的例子,你有一个程序,需要在某天中午进行抽奖,从所有符合条件的用户中随机挑选出 10 个人,将他们的 ID 记录到 MySQL 中去。这很好实现。但是如果你的服务在线上部署了多个实例,那么他们每一个实例都会运行这段代码,每个实例会随机挑出 10 个人,这明显是不符合需求的。
此时就应该使用分布式锁,让所有的实例竞争,只有一个实例可以成功拿到锁,然后进行抽奖操作即可。
分布式锁的实现各种各样,用 MySQL, Redis, zookeeper 等等都可以实现,本文介绍一下如何使用 Redis 来实现一个分布式锁。
后文举例时都以抽奖为例。
实现思路
简单思路
首先,我们理解了分布式锁只是要在 一个独立于分布式系统的地方树立标志
, 然后保证这个标志就有一个人可以用即可。
那么直接在 Redis 中设置一个字符串即可。比如我们可以设置 lock_key
就是代表我们的分布式锁。
当需要获取锁时,首先调用exists
命令判断该 key 是否存在。
- 存在,说明有其他进程获取了锁,当前进程放弃或者重试。在抽奖场景下,当前进程直接放弃就好。
- 不存在。调用
set
命令,随便写入一个值,代表自己获取到了锁,然后进行业务操作,在完成业务操作之后,调用del
命令删除掉该 key.
看起来很简单,几行代码就写完了。伪代码总结如下:
代码语言:javascript复制# 获取锁
exists lock_key
if true:
放弃
if false:
set lock_key 1
# 释放锁
del lock_key
但是这样做是有极大的缺陷的,如果在生产环境这么操作,那么应该会 死的很惨. 我们一步一步梳理。
原子性
获取锁需要两个步骤,也就是判断锁是否存在以及实际的获取锁。这两个步骤是单独的两个命令,并不能保证原子性。
假设进程 A 在判断 lock 是否存在之后,线程 B 又对他进行了操作。然后线程 A 再来继续进行操作,这样明显是错误的,可能造成重复加锁,或者锁状态判断错误。
幸好 Redis 提供了setnx
指令,可以先解决这个问题。
setnx key value
, 是set if not exists
的意思,如果当前 key 存在,则不作任何操作并且返回 0, 如果当前 key 不存在,则进行 set 操作,并且返回 1.
那么此时的分布式锁流程是:
代码语言:javascript复制# 获取锁
setnx key value
if 0:
放弃
if 1:
成功,做业务操作
# 释放锁
del lock_key
服务宕机
下一个问题来了,如果进程 A 在获取到了锁之后,挂掉了,也就是做不了释放锁的操作了。那么这个锁永远的被线程 A 占用了,其他任何的进程都拿不到了,也就是造成了死锁问题。这可是真死锁, 死的透透的。
解决方式是给 key 设置一个过期时间,不管怎么样,在过了这段时间,自动删除 key, 也就是释放了锁。
此时分布式锁的流程是:
代码语言:javascript复制# 获取锁
setnx lock_key 1
if 0:
放弃
if 1:
# 加个 10s 的过期时间
expire lock_key 10
成功,做业务操作
# 释放锁
del lock_key
我们加了 10s 的过期时间,业务操作必须在 10s 内完成并且释放锁,否则到了 10s 立即释放锁,让别人用。
又是原子性
可以发现,获取锁的过程又变成了两个命令,又不能保证原子性了,如果setnx
之后服务立即宕机,那么还是不能解决死锁的问题。也就是我们需要把这两个操作变成原子的。
也许你会原子性不是事务最擅长的嘛,用 Redis 事务来进行,但是不行,expire 是否执行依赖于 setnx 执行的结果,事务里可不提供 if/else 语句。
但是 Redis 提供了 Lua 脚本机制,我们可以写个 lua 脚本发过去,这样可以保证原子性。
当然,在 Redis 2.6.12 之后,Redis 作者为set
命令提供了 多个可选项,现在全部的 set 命令如下:
SET key value [EX seconds|PX milliseconds] [NX|XX] [KEEPTTL]
.
- EX seconds – 秒级的过期时间
- PX milliseconds – 毫秒级的过期时间
- NX – 相当于 setnx.
- XX – setex, 只有当 key 已经存在才写入
- KEEPTTL – 保留 key 的时间。在 6.0 版本才引入的新参数。
有了这个命令之后,我们的分布式锁流程是:
代码语言:javascript复制# 获取锁
set lock_key 1 EX 10 NX
if ok:
继续,业务操作
if nil:
放弃
# 释放锁
del lock_key
业务超时误删除
新的问题到来了,我们解决死锁的方式是加入一个过期时间,那么如果我们的业务耗时超过了过期时间怎么办。
假如进程 A 获取到锁之后,进行业务操作,由于卡顿等其他原因,过了 10s 还是没有执行结束释放锁,此时锁被自动的释放了。线程 B 立刻获取了锁,进行业务操作。此时,如果 A 线程执行完毕,进行释放锁操作,就会将 B 线程持有的锁误删除。
为了防止误删除操作,我们需要 对个暗号.
之前我们在 lock_key 中设置的一直是一个固定的值,我们可以将它改为一个随机值,删除时进行匹配,匹配正确再进行删除操作。
也就是:
代码语言:javascript复制# 释放锁
get lock_key
if equal:
del lock_key
if not equals:
放弃
又有原子性问题了。这个我们可以使用 lua 脚本来解决。
代码语言:javascript复制if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
这样就可以保证原子性了。
业务超时锁已释放
上面给每把锁加上加锁方的 UUID
之后,防止了误删除问题,但是业务超时超过了 key 的过期时间的问题还是没有解决。
我个人认为这其实是个悖论。
我们想要锁不会被单个线程持续占有,造成死锁。才加了锁的过期时间。
又嫌锁会过期,业务太慢了还没跑完。
让业务跑快点啊,没有人会永远等你的!
我的建议是,预估业务的最大容忍时间,把锁设置成相应的时间过期,到期就自动释放。
但是还有其他办法来等他,比如我们在获取到锁之后,可以启动一个后台线程来定义为此线程的锁 续命, 也就是不断的延长过期时间,直到释放之后才停止续命。
如果业务无限阻塞,还是会一直拿着锁,不然别人用。,,, 所以才说是悖论,我们的优化方向应该是把业务搞快点。.
当然,本文后面会给出上面这个解决办法的代码,虽然我不是特别推荐使用。.
可重入性
可重入性对于一个锁实现来说比较重要,像 Java 中的 ReentrantLock 就是可重入锁的一个实现。
想要实现可重入性,Redis 就无能为力了。因为可重入性必然要继续当前线程的相关信息,而线程的信息没有办法唯一化的保存在 Redis 里,因此我们需要在客户端对 Redis 的命令进行一些封装。
比如在 Java 中,我们可以用 ThreadLocal 来对相当锁进行一个计数,以此来实现锁的可重入性。后文提供了一个 Java 版的可重入分布式锁实现。
Java 实现代码
按照上面的思路,我们来实现一个可用的 Redis 分布式锁。使用的三方 Redis 库为:Jedis.
本来还打算逼逼赖赖一番,但是写完代码发现自己写了挺多注释的,各位大佬一看就懂,这里就不多说了。
代码语言:javascript复制package com.huyan.redis;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* Author: pfliu
* Date: 2020/01/17.
* Brief: Redis 实现的 分布式锁。
*/
@Slf4j
public class RedisDistributedLock {
/**
* redis 返回值,OK
*/
private static final String OK = "OK";
/**
* ThreadLocal 统计当前线程获取锁的数量,可重入
*/
private ThreadLocal<Map<String, Integer>> REFS = new ThreadLocal<>();
/**
* 用来跑 定时更新 expire 的线程池
*/
private static final ExecutorService SCHEDULE_POOL = new ScheduledThreadPoolExecutor(10);
/**
* 是否要定时更新 expire
*/
private volatile boolean persistentExpireFlag = false;
/**
* 更新 expire 的次数,可以设定一个最大次数
*/
private int persistentExpireTimes = 4;
/**
* jedis
*/
private final JedisPool jedisPool;
/**
* constructor
*
* @param jedisPool 连接池
*/
public RedisDistributedLock(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
/**
* constructor
*
* @param persistentExpireTimes 获取锁后定时更新锁过期时间的次数
* @param jedisPool 连接池
*/
public RedisDistributedLock(int persistentExpireTimes, JedisPool jedisPool) {
this.persistentExpireTimes = persistentExpireTimes;
this.jedisPool = jedisPool;
}
/**
* 加锁 , 默认加锁 10s, 默认不延长持有锁的时间
*
* @param key key
* @param curValue 加锁设置的值,调用方自己生成并保存的 UUID 即可。
* @return 加锁是否成功
*/
public synchronized boolean lock(String key, String curValue) {
return lock(key, curValue, 10);
}
/**
* 加锁,默认不延长锁持有时间。
*
* @param key key
* @param curValue 加锁设置的值,调用方自己生成并保存的 UUID 即可。
* @param seconds 加锁的时间
* @return 是否加锁成功
*/
public synchronized boolean lock(String key, String curValue, int seconds) {
return lock(key, curValue, seconds, false);
}
/**
* 加锁,支持重入。
*
* @param key key
* @param curValue 加锁设置的值,调用方自己生成并保存的 UUID 即可。
* @param seconds 锁住的时间
* @param persistentExpire 是否自动延长持有锁的时间
* @return 是否加锁成功
*/
public synchronized boolean lock(String key, String curValue, int seconds, boolean persistentExpire) {
Map<String, Integer> countMap = currentRefs();
Integer count = countMap.get(key);
// 已经获得锁的,不用再操作 redis, 直接返回即可。
if (count != null) {
countMap.put(key, count 1);
return true;
}
// 操作 redis, 然后在 threadlocal 记录一下返回
boolean ok = this.lock0(key, curValue, seconds);
if (!ok) return false;
countMap.put(key, 1);
if (persistentExpire) {
persistentExpireFlag = true;
openPersistentExpire(key, curValue, seconds);
}
return true;
}
/**
* 实际操作加锁操作,调用 Redis 的 set 命令
*
* @param key key
* @param curValue value
* @param seconds expire seconds
* @return 是否工程
*/
private synchronized boolean lock0(String key, String curValue, int seconds) {
try (Jedis jedis = this.jedisPool.getResource()) {
String reply = jedis.set(key, curValue, SetParams.setParams().ex(seconds).nx());
return OK.equalsIgnoreCase(reply);
} catch (Exception e) {
log.error("lock error! key={}, value={}, seconds={}", key, curValue, seconds, e);
}
return false;
}
/**
* 提交一个异步线程,延长锁的持有时间
*
* @param key key
* @param curValue value
* @param seconds 延长的时间
*/
private synchronized void openPersistentExpire(String key, String curValue, int seconds) {
SCHEDULE_POOL.submit(() -> {
int i = 0;
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then "
"return redis.call('expire',KEYS[1],ARGV[2]) "
"else "
"return 0 end";
while (i < persistentExpireTimes && persistentExpireFlag) {
try (Jedis jedis = this.jedisPool.getResource()) {
jedis.eval(script, 1, key, curValue, seconds "");
i ;
Thread.sleep(seconds * 1000);
} catch (Exception e) {
log.error("open persistent expire error! key={}, value={}, seconds={}", key, curValue, seconds, e);
}
}
});
}
/**
* 加锁
*
* @param key key
* @param curValue curValue
* @return 是否解锁成功
*/
public synchronized boolean unLock(String key, String curValue) {
Map<String, Integer> countMap = currentRefs();
Integer count = countMap.get(key);
if (count == null) {
return false;
}
count -= 1;
if (count > 0) {
countMap.put(key, count);
} else {
countMap.remove(key);
this.persistentExpireFlag = false;
this.unLock0(key, curValue);
}
return true;
}
/**
* 解锁的实际操作方法,调用 Redis, 使用 Lua 脚本
*
* @param key key
* @param curValue curValue
*/
private synchronized void unLock0(String key, String curValue) {
try (Jedis jedis = this.jedisPool.getResource()) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then "
"return redis.call('del',KEYS[1]) "
"else return 0 end";
jedis.eval(script, 1, key, curValue);
} catch (Exception e) {
log.error("unlock error! key={}, value={}", key, curValue, e);
}
}
/**
* 是否当前的 key-value 持有的锁。
* @param key key
* @param value value
* @return 是否持有锁
*/
public synchronized boolean isLocking(String key, String value) {
if (value == null || key == null) return false;
try (Jedis jedis = this.jedisPool.getResource()) {
String s = jedis.get(key);
return value.equals(s);
}
}
/**
* 线程持有锁的计数
*
* @return key-count 的 map
*/
private synchronized Map<String, Integer> currentRefs() {
Map<String, Integer> map = REFS.get();
if (map != null) {
return map;
}
REFS.set(new HashMap<>());
return REFS.get();
}
}
由于我写的是支持多线程进行各自加各自的锁的,因此内部并不是直接使用 Jedis, 而是要持有 JedisPool, 那么每次释放都挺麻烦的,我自己封装了个小工具类,在上面的代码里面没用,怕混淆视听,这里放出来供大家把玩,就几行。
RedisUtil:
代码语言:javascript复制package com.huyan.redis;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException;
/**
* Author: pfliu
* Date: 2020/01/17.
* Brief: redis 工具类
*/
@Slf4j
public class RedisUtil {
private JedisPool jedisPool;
/**
* 构造方法
*
* @param host
*/
public RedisUtil(String host) {
this.jedisPool = new JedisPool(host);
}
/**
* 构造方法
*
* @param jedisPool jedis 连接池
*/
public RedisUtil(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
/**
* 执行操作
*
* @param caller
*/
public void execute(RedisCaller caller) {
Jedis jedis = jedisPool.getResource();
try {
caller.call(jedis);
} catch (JedisConnectionException e) {
log.error("jedis error. ", e);
caller.call(jedis);
} finally {
jedis.close();
}
}
}
比较简单,就不多说了。
对于上面的分布式锁,我进行了一些必要的测试,主要集中在以下几个方面:
- 基本的加锁解锁操作
- 多线程并发加锁(模拟多进程)
- 锁的可重入性。
package com.huyan.redis;
import lombok.extern.slf4j.Slf4j;
import org.junit.*;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.time.LocalDateTime;
import java.util.concurrent.*;
import static java.lang.Thread.sleep;
@Slf4j
public class RedisDistributedLockTest {
RedisDistributedLock locker;
@Before
public void before() {
locker = new RedisDistributedLock(new JedisPool("localhost"));
}
/**
* 测试基本的加锁解锁功能
*/
@Test
public void testBasicLockUnLock() throws InterruptedException {
// 当前线程加锁,应该成功
boolean lock = locker.lock("test_a", "aaa");
Assert.assertTrue(lock);
// 新启动了个线程,加锁解锁都应该失败的
Thread other = new Thread(() -> {
boolean lock1 = locker.lock("test_a", "bbb");
Assert.assertFalse(lock1);
boolean ab = locker.unLock("test_a", "bbb");
Assert.assertFalse(ab);
});
other.start();
// 稍微睡会,不然比上面的线程快了
Thread.sleep(1000);
// 当前线程解锁应该成功
boolean b = locker.unLock("test_a", "aaa");
Assert.assertTrue(b);
}
/**
* 测试并发性,多线程竞争加锁解锁
*/
@Test
public void testLock() {
ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 15, 1000, TimeUnit.SECONDS
, new ArrayBlockingQueue<>(10));
for (int i = 0; i < 10; i ) {
pool.submit(() -> {
while (true) {
boolean isLock = locker.lock("increase_i", Thread.currentThread().getId() "");
if (isLock) {
log.info(" {} thread get lock, now = {}", Thread.currentThread().getId()
, LocalDateTime.now().toString());
sleep(10000);
locker.unLock("increase_i", Thread.currentThread().getId() "");
} else {
log.info(" {} thread fail.", Thread.currentThread().getId());
sleep(10000);
}
}
});
}
while (pool.getPoolSize() > 0) {
}
}
/**
* 测试可重入性
*/
@Test
public void testReentrant() {
// 同一个线程,连续加锁 4 词
Assert.assertTrue(locker.lock("lock_key", "test_value"));
// 连续解锁 4 次。
Assert.assertTrue(locker.unLock("lock_key", "test_value"));
}
}
运行以上测试,就会发现没什么毛病。顶。
Redisson 的分布式锁实现
自己实现完一个分布式锁之后,我们已经基本上了解如何用 Redis 来实现一个分布式锁,那么来看看业界比较通用的三方库是怎么实现的,学习一下大佬的思想。我选择学习一下Redisson
中的分布式锁,至于其他的三方库就交给读者去学习了。
首先来看一下如何使用:
代码语言:javascript复制 @Test
public void testDistributedLock() {
Config config = new Config();
// config.setTransportMode(TransportMode.EPOLL);
config.useSingleServer()
.setAddress("redis://localhost:6379");
RedissonClient redisson = Redisson.create(config);
IntStream.rangeClosed(1, 5)
.parallel()
.forEach(i -> {
executeLock(redisson);
});
executeLock(redisson);
}
public void executeLock(RedissonClient redisson) {
RLock lock = redisson.getLock("myLock");
boolean locked = false;
try {
locked = lock.tryLock();
log.info("get lock result:{}, threadId = {}", locked, Thread.currentThread().getId());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (locked) {
lock.unlock();
}
}
}
使用起来很简单,关键的地方只有三步骤:
代码语言:javascript复制 RLock lock = redisson.getLock("myLock");
locked = lock.Lock();
lock.unlock();
那么我们分别来看一下这两个方法的实现。
org.redisson.RedissonLock#lock() 方法
lock 方法的大致流程如下,内部调用链比较复杂,其实实现思想都差不多。
其中,加锁操作是通过发送给 Redis 一个 lua 脚本来执行的。lua 脚本的内容如下:
我们对照自己前面的思路,看看 Redisson 是如何解决那么多难题的。
原子性
加锁操作只有一个 lua 脚本,Redis 会保证 lua 脚本的内容的原子性。
超时问题
redisson 在加锁成功之后,会根据请求加锁的时间,选择是否需要进行不断的 延长持有锁的时间
.
- 如果请求加锁时间为-1, 它会启动后台线程,每隔 30s, 查看锁是否释放,如果没有释放就延长过期时间。调用的 lua 脚本如下:
- 如果请求时间为其他值,直接加锁那么久就好了。
可重入性
Redisson 解决了这一问题,在上面的 lua 脚本中,如果当前锁存在,会使用hexists
命令查看当前锁的持有人与当前请求加锁的是不是同一个客户端,如果是的话,调用hincrby
来将持有锁的数量加 1. 以此来实现可重入锁。
org.redisson.RedissonLock#unlock() 方法
解锁方法比较简单,直接调用 lua 脚本即可,lua 脚本如下:
可以看到,脚本中也是有处理可重入锁的相关逻辑的。
先对持有锁的数量进行减 1.
之后如果数量为 0, 则当前线程释放锁,删除掉对应的 key, 在 pubsub 中发布解锁的消息。
如果不为 0, 则直接返回。
当然,在解锁之后,也要停掉对应的续命线程
, 不再定时延长持有锁的时间。
RedLock
上面我们只提到了,如果请求加锁的进程过掉了怎么办。
但是 Redis 也是会挂掉的,如果它挂掉了怎么办?.
我们没有处理,这也说明我们上面的分布式锁只能应用于单节点的 Redis, 不论是你部署单节点,或者你自己搞一致性 hash, 或者找集群中的固定节点。
总之如果你在一个主从上面用,当主节点挂掉,重新选择主节点之后就有可能出错了。
为了解决这个问题,Redis 的做 Antirez 提出了著名的红锁RedLock
.
它的原理大概是这样子:
在 Redis 的分布式环境中,我们假设有 N 个 Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在 N 个实例上使用与在 Redis 单实例下相同方法获取和释放锁。现在我们假设有 5 个 Redis master 节点,同时我们需要在 5 台服务器上面运行这些 Redis 实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:
- 获取当前 Unix 时间,以毫秒为单位。
- 依次尝试从 5 个实例,使用相同的 key 和具有唯一性的 value(例如 UUID)获取锁。当向 Redis 请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个 Redis 实例请求获取锁。
- 客户端使用当前时间减去开始获取锁时间(步骤 1 记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2 1,这里是 3 个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
- 如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。
- 如果因为某些原因,获取锁失败(没有在至少 N/2 1 个 Redis 实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些 Redis 实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。
Redisson 对 RedLock 已经有了良好的实现,实现类为org.redisson.RedissonRedLock
, 但是这里不打算去研究它的源码了,因为这个目前没有必要,客户端注意一下,其实我们连可重入锁很多时间都不需要。手动捂脸。
需要或者想要了解的读者们可以下载 Redisson 源码 自行学习。
总结
本文先是简单介绍了一下分布式锁,之后按照从简单到复杂的思路,用 Redis 实现了一个分布式锁,并一步一步解决了过程中会遇到的问题。之后,学习了 Redisson 的分布式锁实现的源码,对其实现原理及思路有了了解。最后大概介绍了下红锁算法。
生产环境使用的话还是建议使用三方开源包,因为毕竟都是大佬们写的,比较靠谱,个人实现的可以用来在学习的过程中,加强对原理的理解。
参考文章
Redis 官网对分布式锁实现的文章