前言
Redisson是一个强大的分布式Java对象和服务库,专为简化在分布式环境中的Java开发而设计。通过Redisson,开发人员可以轻松地在分布式系统中共享数据、实现分布式锁、创建分布式对象,并处理各种分布式场景的挑战。
Redisson的设计灵感来自于Redis,但它不仅仅是Redis的Java客户端,更是在分布式环境下构建分布式系统所需的一套工具。无论是分布式集合、分布式锁、还是分布式调度器,Redisson都提供了简单而强大的API,使得开发者能够专注于业务逻辑而不必担心复杂的分布式细节。
底层原理
通过提供易于使用的API和丰富的功能集,Redisson旨在帮助开发者更轻松地构建可靠的、高性能的分布式系统。 Redisson的底层原理主要基于Redis的分布式特性和Java的高级特性。以下是一些关键的底层原理:
- Redis协议: Redisson使用Redis协议进行与Redis服务器的通信。这意味着它能够与任何遵循Redis协议的Redis服务器进行交互。通过利用Redis的分布式特性,Redisson实现了分布式对象和服务。
- Java序列化: Redisson使用Java对象的序列化和反序列化机制将Java对象转化为Redis数据结构。这使得在Java应用程序和Redis之间传递对象变得简单。默认情况下,Redisson使用标准的Java序列化,但也支持其他序列化方式,如JSON、Jackson等。
- 分布式锁的实现: Redisson的分布式锁是通过Redis的
SETNX
(set if not exists)命令实现的。它利用了Redis的原子性操作,确保在分布式环境中只有一个客户端能够成功获取锁。 - 监听器和事件通知: Redisson通过订阅/发布机制实现事件通知。当分布式对象发生变化时,Redisson会发布相应的事件,已注册的监听器将得到通知。这基于Redis的
PUB/SUB
功能,使得分布式环境下的事件通知成为可能。 - 分布式集群: 对于Redis集群,Redisson使用了Redis的集群模式。它能够识别集群中的不同节点,并根据需要进行数据分片和分布式操作。
- 线程模型: Redisson使用异步的线程模型来处理与Redis服务器的通信。这有助于提高性能,允许多个操作同时执行而不阻塞主线程。
总体而言,Redisson利用了Redis强大的分布式功能,并通过Java的特性将其封装为易于使用的API。底层的实现涵盖了分布式锁、分布式对象、事件通知等方面,以满足在分布式环境中构建高性能应用程序的需求。
Redisson分布式锁类型
Redisson提供了多种类型的分布式锁,以满足不同场景的需求。以下是一些常见的Redisson分布式锁类型:
- 可重入锁(Reentrant Lock): 可以被同一个线程重复加锁的锁。同一个线程在持有锁的情况下可以再次加锁,而不会引起死锁。
- 公平锁(Fair Lock): 公平锁按照请求加锁的顺序进行获取锁,即先来先得。这有助于避免某些线程长时间等待的问题,提高公平性。
- 联锁(MultiLock): 可以同时获取多个锁,且在释放锁时可以选择全部释放或部分释放。适用于需要操作多个资源的场景。
- 红锁(RedLock): RedLock是一种分布式锁算法,使用多个Redis节点来确保锁的强一致性。通过在不同的节点上创建锁,即使其中一个节点失效,其他节点依然可以工作。
- 读写锁(ReadWrite Lock): 读写锁分为读锁和写锁,多个线程可以同时持有读锁,但只有一个线程能够持有写锁。适用于读多写少的场景。
- 信号量(Semaphore): 类似于Java的
Semaphore
,用于控制同时访问某个资源的线程数量。 - 闭锁(CountDownLatch): 用于等待多个线程完成操作后再执行下一步操作。
- 过期锁(Lease Lock): 具有自动过期时间的锁,确保在一定时间内锁会被释放,避免锁长时间占用。
这些锁的类型使得Redisson适用于各种分布式场景,开发者可以根据具体的需求选择合适的锁类型来确保分布式环境下的协同和同步。
可重入锁(Reentrant Lock)
基于Redis的Redisson分布式可重入锁RLockJava对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
引入 Maven 依赖
在微服务的 pom.xml 引入 redisson 的 maven 依赖
代码语言:html复制 <dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.2</version> <!-- 使用最新版本 -->
</dependency>
自定义配置类
下面的代码是单节点 Redis 的配置。
代码语言:java复制package com.example.demo.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
@Configuration
public class RedissonConfig {
/**
* 对 Redisson 的使用都是通过 RedissonClient 对象
* @return
* @throws IOException
*/
@Bean(destroyMethod="shutdown") // 服务停止后调用 shutdown 方法。
public RedissonClient redisson() throws IOException {
// 创建配置
Config config = new Config();
config.useSingleServer()
.setAddress("redis://xx.xx.x.x:6379")
.setPassword("123456");
// 集群模式
// config.useClusterServers().addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001");
return Redisson.create(config);
}
}
测试API
代码语言:java复制package com.example.demo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/redis")
public class MyController {
@Autowired
private ReentrantLockService reentrantLockService;
@GetMapping("/locked-operation")
public String performLockedOperation() {
// 模拟多个线程同时调用可重入锁的操作
for (int i = 1; i <= 5; i ) {
final int threadNumber = i;
new Thread(() -> {
reentrantLockService.performLockedOperation();
}).start();
}
return "Locked operation initiated.";
}
}
测试类
代码语言:java复制package com.example.demo.controller;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class ReentrantLockService {
@Autowired
private RedissonClient redissonClient;
public void performLockedOperation() {
// 获取可重入锁
RLock lock = redissonClient.getLock("myReentrantLock");
String threadName = Thread.currentThread().getName();
try {
// 尝试加锁,最多等待10秒,锁的自动释放时间为30秒
boolean isLocked = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (isLocked) {
// 执行需要加锁的操作
log.info(threadName " - 获取锁成功,执行加锁操作...");
// 模拟业务操作
Thread.sleep(5000);
log.info(threadName " - 加锁操作完成。");
} else {
log.info(threadName "在指定时间内无法获取锁。");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println(threadName "尝试获取锁时发生中断。");
} finally {
// 释放锁
if (lock.isHeldByCurrentThread()) {
log.info(threadName " - 释放锁");
lock.unlock();
}
}
}
}
输出日志
代码语言:javascript复制2023-11-17 15:56:13.935 INFO 12316 --- [ Thread-17] c.e.d.controller.ReentrantLockService : Thread-17 - 获取锁成功,执行加锁操作...
2023-11-17 15:56:18.945 INFO 12316 --- [ Thread-17] c.e.d.controller.ReentrantLockService : Thread-17 - 加锁操作完成。
2023-11-17 15:56:18.969 INFO 12316 --- [ Thread-17] c.e.d.controller.ReentrantLockService : Thread-17 - 释放锁
2023-11-17 15:56:19.020 INFO 12316 --- [ Thread-16] c.e.d.controller.ReentrantLockService : Thread-16 - 获取锁成功,执行加锁操作...
2023-11-17 15:56:23.901 INFO 12316 --- [ Thread-19] c.e.d.controller.ReentrantLockService : Thread-19在指定时间内无法获取锁。
2023-11-17 15:56:23.901 INFO 12316 --- [ Thread-20] c.e.d.controller.ReentrantLockService : Thread-20在指定时间内无法获取锁。
2023-11-17 15:56:23.909 INFO 12316 --- [ Thread-18] c.e.d.controller.ReentrantLockService : Thread-18在指定时间内无法获取锁。
2023-11-17 15:56:24.023 INFO 12316 --- [ Thread-16] c.e.d.controller.ReentrantLockService : Thread-16 - 加锁操作完成。
2023-11-17 15:56:24.046 INFO 12316 --- [ Thread-16] c.e.d.controller.ReentrantLockService : Thread-16 - 释放锁
分析日志
这段日志展示了一个使用Redisson实现的分布式锁的情景。让我详细解释一下日志和代码的原理:
获取锁(Thread-17):
Thread-17
成功获取了名为 "myReentrantLock" 的可重入锁。- 执行了一段需要锁保护的业务操作,模拟了一个长时间的操作,持有锁。
释放锁(Thread-17):
Thread-17
完成了业务操作,释放了锁。
获取锁(Thread-16):
Thread-16
在Thread-17
释放锁之后成功获取了相同的锁。- 执行了一段需要锁保护的业务操作,然后释放了锁。
获取锁失败(Thread-19, Thread-20, Thread-18):
Thread-19
,Thread-20
, 和Thread-18
在指定的等待时间内无法获取锁,因为此时Thread-16
持有锁。
总结原理:
- 通过
redissonClient.getLock("myReentrantLock")
创建了一个可重入锁对象。 lock.tryLock(10, 30, TimeUnit.SECONDS)
尝试获取锁,在10秒内等待,锁的自动释放时间为30秒。- 如果获取锁成功,执行需要加锁的操作,然后释放锁。
- 其他线程在获取锁时,如果超过指定时间未能成功获取,会得到相应的提示。
这段代码通过Redisson实现了一个可重入的分布式锁,确保在分布式环境下对共享资源的安全访问。成功获取锁的线程执行受保护的操作,其他线程则需要等待或处理获取锁失败的情况。这有助于协调分布式系统中的并发访问,防止竞争条件和数据不一致性。
阻塞与非阻塞
阻塞方式
- 在阻塞方式中,线程在尝试获取锁时,如果锁已被其他线程占用,那么当前线程会被阻塞,一直等到锁被释放后才能继续执行。在阻塞模式下,线程可能会等待相当长的时间,直到获取到锁。
ReentrantLock lock = new ReentrantLock();
// 阻塞方式获取锁
lock.lock();
try {
// 执行需要锁保护的代码
} finally {
lock.unlock();
}
非阻塞方式
- 在非阻塞方式中,线程尝试获取锁时,如果锁已被其他线程占用,当前线程不会被阻塞,而是立即返回一个结果,告知是否成功获取锁。非阻塞方式下,线程不会等待,而是可以继续执行其他操作。
ReentrantLock lock = new ReentrantLock();
// 非阻塞方式尝试获取锁
if (lock.tryLock()) {
try {
// 执行需要锁保护的代码
} finally {
lock.unlock();
}
} else {
// 未获取到锁的处理逻辑
}
看门狗Watchdog
Redisson 使用看门狗(Watchdog)机制来保持分布式锁的有效性。看门狗是一种定时任务,负责定期延长锁的过期时间,确保在业务执行时间较长或者发生异常情况时,锁不会过早释放。
下面是 Redisson 看门狗的简要原理:
- 锁的过期时间: 当获取分布式锁时,会设置锁的过期时间(通常是锁的租期)。这个过期时间是在 Redis 中设置的,表示锁在这段时间内有效。
- 看门狗的作用: Redisson 的看门狗定期(比如每隔一定时间)检查当前线程持有的锁是否过期。如果锁的过期时间快到了,看门狗会尝试续租,延长锁的过期时间。
- 续租操作: 续租操作是通过发送一个延长锁过期时间的命令到 Redis。如果当前线程在续租时发生了异常,比如网络异常,看门狗会尽力保证在后续的定时任务中继续尝试续租。
- 锁的释放: 如果看门狗发现锁已经过期且无法续租,它会尝试删除锁,释放资源。这是为了防止因为业务执行时间较长或者发生异常情况导致锁一直被占用而不释放。
- 线程关闭时的处理: Redisson 看门狗还处理了线程关闭的情况。如果获取锁的线程关闭了,看门狗会立即释放锁,以避免死锁情况。
通过看门狗机制,Redisson 能够确保在使用分布式锁的场景下,锁不会因为持有锁的线程异常退出或者执行时间过长而导致锁被过早释放。这提高了分布式锁的可靠性和稳定性。
源码解析
这段代码是 Redisson 中续租锁过期时间的方法。让我们逐步解析其中的关键部分:
renewExpiration 方法: 这个方法用于执行锁的过期时间续租操作。
代码语言:javascript复制private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 创建定时任务,定时执行续租操作
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 获取续租信息
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 异步执行续租操作
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
// 续租失败,记录错误日志,移除续租信息
log.error("Can't update lock " getRawName() " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// 续租成功,重新调度续租任务
renewExpiration();
} else {
// 续租失败,取消续租任务
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 续租时间为租约时间的1/3
// 将定时任务绑定到续租信息中
ee.setTimeout(task);
}
internalLockLeaseTime 变量
定时任务创建: 使用 commandExecutor.getConnectionManager().newTimeout
创建一个定时任务,这个任务会在 internalLockLeaseTime / 3
毫秒后执行。
续租操作: 在定时任务执行时,异步执行 renewExpirationAsync
方法,该方法负责向 Redis 发送命令更新锁的过期时间。
回调处理: 在异步续租操作完成时,根据续租操作的结果,进行相应的处理。
- 如果续租成功,重新调度下一次续租任务。
- 如果续租失败,取消续租任务,并记录错误日志。
这个机制通过定时任务实现了定期的锁续租,确保分布式锁在持有期间不会因为过期而被自动释放。
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!