快速上手Spring Integration提供的可重入防死锁的分布式锁
代码语言:javascript复制*分布式锁,是分布式应用中不可获缺的一个工具。
*典型的微服务架构中,在进行某些重要业务的时候,需要在整个微服务应用中对业务进行上锁。
*除此之外,即使是简单的单机项目,也有可能会同一个项目进行多部署,采用Apache或Nginx实现负债均衡,
在这种场景下,对互斥的业务操作也需要进行上锁处理。
1、如果你之前没有接触过分布式锁的概念,请移步其他文章。本篇文章不会给你讲解什么是分布式锁,为什么需要分布式锁,以及如何实现分布式锁
2、本篇文章简单暴力的讲解一套由SpringCloud项目团队封装出来的分布式锁工具Spring Integration。你可以直接投入到生产环境中使用,如果你的团队有已经实现的更好的分布式锁,一般地的来说,是不需要再看这一个技术。但是如果你们团队没有能力自研分布式锁,或者希望有一个成熟的分布式锁能马上投入生产使用,那这一套工具无疑是非常重要的
3、本篇文章不会带你分析源码,希望各位自行去翻阅源码进行学习,锁的API也非常简单 (已更新分析源码部分,往下看就有)
Spring Integration提供的分布式锁的实现有如下4种实现方式:
- Gemfire
- JDBC
- Redis
- Zookeeper
● 一般地、在外面实现分布式锁用的比较多的是Zookeeper和Redis。 ● Spring Integration不需要你去关注它到底是基于什么存储技术实现的,它是面向接口编程,低耦合让你不需要关注底层实现。你要做的仅仅是做简单的选择,然后用相同的一套api即可完成分布式锁的操作。
该分布式锁的优缺点:
1、已实现可重入、解决了死锁问题 可重入:同一个线程,可以多次获得相同的锁。这个应该是实现锁都应该去实现的特性。否则你的锁,很有可能自己把自己搞死了。
死锁问题:如果一个线程在竞争锁成功后,意外宕机了,导致没有主动去释放锁。那么锁在一般情况下,就会永久保留,这就造成了死锁。需要人工去处理,一般的,类似于使用Redis作为实现工具的,出现死锁的时候,就要手动去Redis里面找到这一个锁然后del 掉它。
2、缺点,无法续期锁 为了解决死锁问题,在redis作为实现工具的情况下,默认是采用redis的TTL设置过期事件来解决死锁问题。默认是60s,如果你加锁之后的业务操作,大于60秒,就会导致锁自动释放,其他线程此时可以竞争获得你的锁。但是实际上,你本应该还持有锁。
该框架没有锁续期,或者自定义锁过期时间的API,因此要非常注意你加锁的业务功能,务必要在60s内完成。
一般地、在其他大牛实现分布式锁时,会有另外一个线程持续监控获得锁的线程,如果线程没有主动释放锁,而又处于活跃状态(即还在处理业务),那么另外一个线程会帮助这个锁进行续期,以保证锁不会因为超时而自动释放。
(本人没有过度研究这套源码,可能是已经实现了续期的功能,但是我不知道在哪里使用,如果有人知道,可以在留言区提醒我)
=========================================================================================================
啥也不说,直接开干。
项目基于Maven SpringBoot , 分布式锁的实现采用的是Redis 。(因此请为SpringBoot整合好Redis)
Step 1: 导入Spring Integration依赖
Step 2: 配置JavaConfig以及Bean
Step 3:获取锁的代码骨架
在需要使用锁的Bean里面 注入依赖
官方源码位置: https://github.com/spring-projects/spring-integration
Good Lucky!
上面的文章快速入门了基于Redis实现的分布式可重入锁,你已经可以直接在生产环境中使用该锁
以下为原理刨析,可以让你对Spring Integration实现的分布式锁有更深入的理解。
在阅读了它的源码后,本人觉得有一些很值得学习的思想
(这里假设你已经有ReentrantLock的知识概念,以及操作Redis的知识概念,否则以下内容你将无法展开)
STEP 1:首先从RedisLockRegistry这个类出发分析
从上文图片可知,RedisLockRegistry是通过new RedisLockRegistry(redisConnectionFactory, “redis-lock-test”);
传入的参数包括:
- RedisConnectionFactory用于构造RedisTemplate(用于操作Redis命令)
- registryKey: 你的分布式锁在Redis中的前缀,请为你的分布式应用合理的指定一个唯一的名称前缀
- expireAfter(过期时间毫秒数):默认是DEFAULT_EXPIRE_AFTER(60秒),也就是说,你的锁最多持有60s,如果你的正常业务代码持锁期间有可能会超过60s,那么你必须使用第二个构造方法,来为你的应用指定合理的过期时间。这也是分布式锁防止线程意外宕机,出现死锁的情况
- obtainLockScript (Lua语言实现的脚本代码):简单的说,Redis的Lua本身实现了原子操作,这段脚本的功能就是实现从redis竞争锁的过程,待会会给你详细解读脚本代码。此外,这个属性是final,由官方定义好脚本语言,一般情况下,你是不需要改动的
- final String clientId = UUID.randomUUID().toString() : 还有一个属性定义在RedisLockRegistry对象里,它的作用是通过UUID随机生成一个不重复的id,以此来区分不同应用程序。(由于在Spring的整合中,RedisLockRegistry是单例的,所以这里对于每个应用程序来说,它只有唯一的一个实例,所以clientId的作用就是区分不同应用程序。倘若你的应用程序实例化了多个RedisLockRegistry,那么clientId的作用仅仅用于标识不同的实例对象,它们的核心作用在于在Redis端的竞争)
STEP 2:redisLockRegistry.obtain(String lockKey)
获取锁的第一步,通过lockKey定义一个即将要去竞争的锁
每个redisLockRegistry对象内部会维护一个线程安全的Map,即上面代码中第三行的locks。它的作用是用于保存名为lockKey对应的RedisLock对象。
可能会有人问computeIfAbsent以及RedisLock::new是什么来的
- JDK8中Map有一个新方法computeIfAbsent,用于如果传入的key对应的value为null,就将第二个参数设置进去
- JDK8中有个新的语法糖,专业属于叫引用方法,可以自行百度学习。RedisLock:;new的意思就是传入RedisLock的构造方法进去,它有一个新的类叫Function,请自行了解
通过上面代码,你就已经获得了一个名为lockKey的Lock对象。下面进入重点环节
STEP 3: RedisLock implements Lock详解
- Lock接口是并发编程包JUC中比较常见的一个接口,很多实现锁功能的类都是实现这个接口,它为Java编程里面的锁提供了一个抽象
- RedisLock则是Spring Integration作者根据实际项目需求所实现的锁,它的目的就是实现分布式锁的功能
RedisLock的3个主要属性:
- private final String lockKey:(全锁名)它是完整的锁名,它会组合你在RedisLockRegistry对象定义的registryKey(前缀) 你obtain()时传入的lockKey。 因此它是完整的在Redis中的key值
- private final ReentrantLock localLock = new ReentrantLock(); (实现可重入的核心)可重入锁,这里不多说。它在这里的目的是为了实现当前客户端的资源竞争。Spring Integration实现的分布式锁分为两个步骤,首先线程是在当前客户端进行竞争锁资源,竞争成功后再代表当前客户端去Redis端与其他客户端进行锁竞争。
- private volatile long lockedAt; (竞争锁成功那一刻的时间) 用于记录当前锁竞争成功那一刻的时间毫秒数
lock(), tryLock()详解:
首先无论是lock还是trylock方法,他们只有无限阻塞和尝试一段时间竞争锁的区别。他们的工作核心流程都是:先竞争ReentrantLock,成功后再调用obtainLock()进行Redis端的锁竞争。 两步依次都成功后,才会返回true,表明你本次竞争锁成功。
代码(仅解读lock()方法,trylock自行举一反三):
代码语言:javascript复制@Override
public void lock() {
// 第一步,先进行ReentrantLock竞争,它的目的是在当前客户端中,不同线程之间先竞争一轮,决出最终竞争成功的那个线程
// 同时这ReentrantLock默认是nonFair非公平锁
this.localLock.lock();
// 运行到这里表明,当前线程已经在本客户端中竞争成功,但并不意味着,你的分布式锁就能成功
// 此时,你将代表当前客户端clientId,去Redis端进行竞争
while (true) {
try {
// 调用obtainLock()去redis端进行竞争,直到竞争成功
while (!obtainLock()) {
Thread.sleep(100); // 本次竞争失败,等待100ms再去尝试
}
// 执行到这里,表明Redis端也竞争成功了,此刻,你才是真正的分布式锁竞争成功!
break;
}
catch (InterruptedException e) {
}
catch (Exception e) {
// 注意如果在这里try catch出现任何异常,我们都需要把当前客户端的ReentrantLock进行unlock释放,防止死锁
this.localLock.unlock();
rethrowAsLockException(e);
}
}
}
/**
* 线程在自己的客户端中竞争成功后,代表当前客户端去Redis端进行分布式锁的竞争
*/
private boolean obtainLock() {
// 操作redis-lua的方法
Boolean success =
RedisLockRegistry.this.redisTemplate.execute(
/* lua脚本 */RedisLockRegistry.this.obtainLockScript,
/* keys */Collections.singletonList(this.lockKey),
/* argv[1] */RedisLockRegistry.this.clientId,
/* argv[2] */String.valueOf(RedisLockRegistry.this.expireAfter));
/*
lua脚本执行后,会返回true or false
true:分布式锁竞争成功
false:分布式锁竞争失败
*/
boolean result = Boolean.TRUE.equals(success);
if (result) {
// 如果true,那么就记录此刻的时间
this.lockedAt = System.currentTimeMillis();
}
return result;
}
LUA脚本代码(已为你注释好,应该自行阅读问题不大)
代码语言:javascript复制/**
* KEYS[1] : lockKey 锁名
* ARGV[1] : clientId 客户端id
* ARGV[2] : expireAfter 过期时间毫秒级
*/
// 调用Redis GET命令,获取lockKey对应的value值
local lockClientId = redis.call('GET', KEYS[1])
if lockClientId == ARGV[1] then
// 如果lockClientId不为空,且value值等于传入的clientId,说明它就是这个锁的锁主
// 出现这种情况,说明这是第N(N>1)次重入
// 在Redis端会为这次重入,重置KV的TTL
redis.call('PEXPIRE', KEYS[1], ARGV[2]) // 调用PEXPIRE为 lockkey设置过期时间(毫秒)
return true
elseif not lockClientId then
// 如果lockClientId为空,则会进入当前elseif
// 出现这种情况,说明没有其他客户端持有该锁,所以该value才会为nil(空)
// 表明你这次竞争锁成功
redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])
return true
end
// 如果lockClientId不为空,又不等于当前clientId
// 那么就是竞争失败
return false
从上面lock()方法的源码可知,解决可重入问题是通过ReentrantLock来辅助实现的。而解决死锁问题则是通过Redis的TTL实现。它的工作思想比较巧妙,总结为以下一张图:
在这里,简单的说: 假设有三所学校A,B,C。每所学校有3个教师A1,A2,A3,B1,B2,B3,C1,C2,C3。 一共9个老师去教育局请教育局长来学校调研。 在这里,教育局局长就是共享资源,它每次肯定只能去一所学校参观 首先,每所学校的3名教师会先进行内部竞争,决出一名教师代表自己的学校去教育局。最终每所学校的那名教师代表自己的学校,到达教育局,教育局局长的接待工作由秘书负责,秘书按照先后顺序接见A,B,C三所学校的代表教师。如果教育局局长有空,则由最先到的教师带走教育局局长去它的学校调研。调研结束后,教育局局长返回教育局,再由第二所学校的教师带走教育局局长。
我想这样解释就很生动的模拟了上面分布式锁的竞争过程。 ⭐思考:为什么不能是9个教师直接到教育局进行先后竞争呢? 回答:
- 开销:每个学校派出3名老师,他们的路费就是3倍。
- 资源:教育局的接待数量有限,而且肯定不止一种业务(邀请教育局局长到学校调研)。9个教师同时到达教育局,教育局的接客空间是有限的,而且人多起来,可能秘书会手忙脚乱。
⭐回归整体的思考:为什么不能是9个线程,直接到Redis端进行竞争呢? 个人分析: 首先,如果你有了解过Redis实现的分布式锁,你可以从百度上看到很多别人的文章。最简单也挺有效的一种方式,就是利用Redis的setnx命令以及Redis本身单线程串行处理所有命令的特性,来实现一个可用的分布式锁。他们这些锁都有一个特点,就是每个线程为一个个体,到达Redis进行竞争。
之所以这里,作者要这样设计,我想应该出于以下几点优化:
1. 网络开销:同一个应用程序3个线程,就需要发送3条命令到Redis,并且有其中2条命令是肯定会失败的 2. 线程自旋开销:如果竞争失败,像lock的逻辑就是不断去重试直到成功,那么每次重试都需要发送一次Redis命令,每次都是网络开销。但是作者现在是,先是内部JVM层面的竞争,竞争成功后就会由3个线程变为1个线程去进行会消耗网络的自旋。而另外2个线程则只是消耗CPU的自旋。倘若是3个线程都去redis进行竞争,那么就是3个CPU自旋 3个网络消耗。而现在只是3个CPU自旋 1个网络消耗 3. Redis性能:Redis肯定不仅仅是为了解决分布式锁而存在的,它的功能有很多。9个线程去让redis进行工作,和3个线程去让redis进行工作,对redis的性能消耗肯定是不同的。(当然这里3和9肯定可以忽略不计了,但是毕竟这里简单举例子,放大十倍,百倍,千倍就是一笔大的开销) 4.为了实现可重入:我想这个才是这项设计比较核心的考虑。对比网上没有可重入功能的redis分布式锁,可以看到都是没有ReentrantLock的辅助的。但是我们可知,可重入性几乎是锁必备的特性,而ReentrantLock是Java实现好的一款极具生产价值的可重入锁。因此作者为了利用ReentrantLock实现可重入性,而由此衍生出这样的设计考虑。
代码语言:javascript复制 (当然上面都是个人分析罢了,实现可重入应该还有很多方式,不过在看了作者的源码后,感觉这是一个非常不错的考虑)
unlock()方法:
代码语言:javascript复制@Override
public void unlock() {
if (!this.localLock.isHeldByCurrentThread()) {
// 先判断当前ReentrantLock的锁主是不是当前线程
throw new IllegalStateException("You do not own lock at " this.lockKey);
}
if (this.localLock.getHoldCount() > 1) {
// 判断可重入标记,如果大于1,说明重入了getholdCount()次,这一次unlock()只是让计数-1,而不会真正释放Redis端的分布式锁
this.localLock.unlock();
return;
}
try {
if (!isAcquiredInThisProcess()) {
// 这个方法封装了redis端的判断,它会判断Redis端的锁是不是你持有的
// 一般情况下,这个方法都会返回true,则跳过这次报错
// 如果代码进入此报错,原因主要是,每个锁的过期时间默认60s,如果你持有锁的情况下超过60s后再unlock(),
// 此时锁早就已经过期丢弃,甚至被其他线程竞争掉,所以你的unlcok会失败
throw new IllegalStateException("Lock was released in the store due to expiration. "
"The integrity of data protected by this lock may have been compromised.");
}
// 调用redis del命令删除KV。也就是释放这次分布式锁
if (Thread.currentThread().isInterrupted()) {
RedisLockRegistry.this.executor.execute(() ->
RedisLockRegistry.this.redisTemplate.delete(this.lockKey));
}
else {
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
}
if (logger.isDebugEnabled()) {
logger.debug("Released lock; " this);
}
}
catch (Exception e) {
ReflectionUtils.rethrowRuntimeException(e);
}
finally {
// 最后记得把本地的ReentrantLock进行unlock(),以让其他等待线程进行竞争
this.localLock.unlock();
}
}