分布式锁介绍及解决方案
什么是分布式锁?
在分布式系统中,多个节点可能同时操作同一资源,此时需要使用分布式锁来保护共享资源的访问。分布式锁要求在多个节点上都能起到保护作用,并且能够保证在高并发情况下的正确性和效率。
常用的分布式锁实现方式
ZooKeeper 实现分布式锁
ZooKeeper 是一个开源的分布式协调服务框架,其提供了分布式锁的实现。ZooKeeper 的锁机制主要通过节点创建和Watch机制实现。
ZooKeeper 分布式锁实现步骤
- 创建目录 /locks
- 节点想获取锁就在 /locks 下面创建一个 EPHEMERAL_SEQUENTIAL 节点
- 获取 /locks 下面所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前节点顺序最小,获取到锁
- 使用 Watcher 监听比自己小的兄弟节点的删除事件,一旦兄弟节点被删除(即释放锁),则当前节点重新判断自己是否顺序最小,如果是则获取锁
ZooKeeper 分布式锁示例代码
代码语言:javascript复制public class ZookeeperLock {
private static final String ZK_ADDRESS = "localhost:2181";
private static final int SESSION_TIMEOUT = 3000;
private String lockPath;
private ZooKeeper zooKeeper;
public ZookeeperLock(String lockPath) {
this.lockPath = "/locks/" lockPath;
try {
zooKeeper = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, null);
Stat stat = zooKeeper.exists("/locks", false);
if (stat == null) {
zooKeeper.create("/locks", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
stat = zooKeeper.exists(lockPath, false);
if (stat == null) {
zooKeeper.create(this.lockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void lock() throws Exception {
// 创建临时节点
String seqNodeName = zooKeeper.create(lockPath "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 尝试获取锁
attemptLock(seqNodeName);
}
private boolean attemptLock(String seqNodeName) throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren("/locks", false);
Collections.sort(children);
int index = children.indexOf(seqNodeName.substring("/locks/".length()));
switch (index) {
case -1:
throw new KeeperException.NoNodeException("path not found");
case 0:
return true;
default:
// 获取前一个节点的路径和 watcher
String preWatcherPath = "/locks/" children.get(index - 1);
final CountDownLatch latch = new CountDownLatch(1);
Stat stat = zooKeeper.exists(preWatcherPath, event -> {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
});
// 前一个锁已经释放
if (stat == null) {
return attemptLock(seqNodeName);
}
// 等待前一个节点的删除事件
latch.await();
return true;
}
}
public void unlock() {
try {
zooKeeper.delete(lockPath, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
} finally {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Redis 实现分布式锁
Redis 是一种基于内存的高性能键值数据库,其提供了一种实现分布式锁的机制。
Redis 分布式锁实现步骤
- 使用 Redis 的 SETNX (set if not exists) 命令来尝试获取锁,如果返回值为 1 则说明获取锁成功,否则说明锁已经被其他节点占用
- 为了避免死锁情况,需要为每个锁设置过期时间(expire),确保即使持有锁的节点出现故障或崩溃,也不会一直持有锁而导致系统无法继续运行。
Redis 分布式锁示例代码
代码语言:javascript复制public class RedisLock {
private static final int RETRY_TIMES = 10;
private static final long DEFAULT_EXPIRE_TIME = 3000; // 默认过期时间 3s
private static final String LOCK_PREFIX = "lock:";
private RedisTemplate<String, String> redisTemplate;
public RedisLock(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public boolean lock(String lockKey) {
return lock(lockKey, DEFAULT_EXPIRE_TIME);
}
public boolean lock(String lockKey, long expireTime) {
String key = LOCK_PREFIX lockKey;
for (int i = 0; i < RETRY_TIMES; i ) {
// SETNX 命令尝试获取锁
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, "");
if (result != null && result) {
// 设置锁过期时间
redisTemplate.expire(key, expireTime, TimeUnit.MILLISECONDS);
return true;
}
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
return false;
}
}
return false;
}
public void unlock(String lockKey) {
String key = LOCK_PREFIX lockKey;
redisTemplate.delete(key);
}
}
分布式锁的应用场景
分布式锁可以应用于需要保证数据一致性、防止重复操作或单一任务等场景,如限流、秒杀、抢购等高并发场景。
参考资料
- 深入浅出分布式锁
- 使用 Redis 实现分布式锁