一、分布式锁概述
1.1、分布式锁作用
1)在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
2)具备高可用、高性能的获取锁与释放锁
3)具备锁失效机制,防止死锁
4)具备非阻塞锁(没有获取到锁将直接返回获取锁失败)或堵塞锁特性(根据业务需求考虑)
1.2、分布式锁应用场景
1)库存扣减与增加
分布式锁保证库存扣减不会超卖,库存增加不会造成库存数据不准确
2)积分抵现
防止积分扣减出现溢出的情况
3)会员礼品核销
防止礼品核销多次
1.3、实现方式
1)使用Redis,基于setnx命令或其他。
2)使用ZooKeeper,基于临时有序节点。
3)使用MySQL,基于唯一索引
二、基于Zookeeper实现分布式锁
2.1、Zookeeper特性介绍
1)有序节点
假如当前有一个父节点为/lock,我们可以在这个父节点下面创建子节点;zookeeper提供了一个可选的有序特性,例如我们可以创建子节点“/lock/node-”并且指明有序,那么zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号,也就是说如果是第一个创建的子节点,那么生成的子节点为/lock/node-0000000000,下一个节点则为/lock/node-0000000001,依次类推。
2)临时节点
客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。
3)事件监听
在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端。当前zookeeper有如下四种事件:节点创建、节点删除、节点数据修改、子节点变更。
2.2、Zookeeper分布式锁实现(方式一)
2.2.1、实现原理
1)客户端连接zookeeper,并在父节点(/lock)下创建临时的且有序的子节点,第一个客户端对应的子节点为/lock/lock-1,第二个为/lock/lock-2,以此类推。 2)客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听/lock的子节点变更消息,获得子节点变更通知后重复此步骤直至获得锁; 3)执行业务代码; 4)完成业务流程后,删除对应的子节点释放锁。
2.2.2、实现代码
1.基于curator的zookeeper分布式锁实现
代码语言:javascript复制public static void main(String[] args) throws Exception {
//创建zookeeper的客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);
client.start();
//创建分布式锁, 锁空间的根节点路径为/curator/lock
InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
mutex.acquire();
//获得了锁, 进行业务流程
System.out.println("Enter mutex");
//完成业务流程, 释放锁
mutex.release();
//关闭客户端
client.close();
}
2.实现方式二
1)定义变量
代码语言:javascript复制/**
* Zookeeper客户端
*/
private ZooKeeper zookeeper;
/**
* 锁的唯一标识
*/
private String lockId;
/**
* 与Zookeeper建立会话的信号量
*/
private CountDownLatch connectedLatch;
/**
* 创建分布式锁的过程中,开始和等待请求创建分布式锁的信号标志
*/
private CountDownLatch creatingLatch;
/**
* 分布式锁路径前缀
*/
private String locksRootPath = "/locks";
/**
* 排在当前节点前面一位的节点的路径
*/
private String waitNodeLockPath;
/**
* 为了获得锁,本次创建的节点的路径
*/
private String currentNodeLockPath;
2)构造函数
代码语言:javascript复制public ZookeeperTempOrderLock(String lockId) {
this.lockId = lockId;
try {
// 会话超时时间
int sessionTimeout = 30000;
//
zookeeper = new ZooKeeper("192.168.0.93:2181", sessionTimeout, this);
connectedLatch.await();
} catch (IOException ioe) {
log.error("与Zookeeper建立连接时出现异常", ioe);
} catch (InterruptedException ite) {
log.error("等待与Zookeeper会话建立完成时出现异常", ite);
}
}
3)实现Zookeeper的watcher
代码语言:javascript复制@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
connectedLatch.countDown();
}
if (creatingLatch != null) {
creatingLatch.countDown();
}
}
4)获取分布式锁
代码语言:javascript复制/**
* 获取锁
*/
public void acquireDistributedLock() {
try {
while(!tryLock()) {
// 等待前一项服务释放锁的等待时间 不能超过一次Zookeeper会话的时间
long waitForPreviousLockRelease = 30000;
waitForLock(waitNodeLockPath, waitForPreviousLockRelease);
}
} catch (InterruptedException | KeeperException e) {
log.error("等待上锁的过程中出现异常", e);
}
}
public boolean tryLock() {
try {
// 创建顺序临时节点
currentNodeLockPath = zookeeper.create(locksRootPath "/" lockId,
"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 查看刚刚创建的节点是不是最小节点
// 比如针对于这个同名节点,之前有其它服务曾申请创建过,因此Zookeeper中临时顺序节点形如:
// /locks/10000000000, /locks/10000000001, /locks/10000000002
List<String> nodePaths = zookeeper.getChildren(locksRootPath, false);
Collections.sort(nodePaths);
if(currentNodeLockPath.equals(locksRootPath "/" nodePaths.get(0))) {
// 如果是最小节点,则代表获取到锁
return true;
}
// 如果不是最小节点,则找到比自己小1的节点 (紧挨着自己)
int previousLockNodeIndex = -1;
for (int i = 0; i < nodePaths.size(); i ) {
if(currentNodeLockPath.equals(locksRootPath "/" nodePaths.get(i))) {
previousLockNodeIndex = i-1;
break;
}
}
this.waitNodeLockPath = nodePaths.get(previousLockNodeIndex);
} catch (KeeperException | InterruptedException e) {
log.error("创建临时顺序节点失败", e);
}
return false;
}
6)等待其他服务释放锁
代码语言:javascript复制/**
* 等待其他服务释放锁
* 实际上就是在等待前一个临时节点被删除
*
* @param nodePath 希望被删除的节点的相对路径
* @param waitTime 等待时长 单位:毫秒
*/
private boolean waitForLock(String nodePath, long waitTime) throws KeeperException, InterruptedException {
Stat stat = zookeeper.exists(locksRootPath "/" nodePath, true);
if (stat != null) {
this.creatingLatch = new CountDownLatch(1);
this.creatingLatch.await(waitTime, TimeUnit.MILLISECONDS);
this.creatingLatch = null;
}
return true;
}
7)释放分布式锁
代码语言:javascript复制/**
* 释放锁
* 实际上就是删除当前创建的临时节点
*/
public void releaseLock() {
log.info("准备删除的节点路径: " currentNodeLockPath);
try {
zookeeper.delete(currentNodeLockPath, -1);
currentNodeLockPath = null;
zookeeper.close();
} catch (Exception e) {
log.error("删除节点失败", e);
}
}
2.3、Zookeeper分布式锁实现(方式二)
2.3.1、实现原理
假设有两个服务A、B希望获得同一把锁,执行过程大致如下:
1)服务A向zookeeper申请获得锁,该请求将尝试在zookeeper内创建一个临时节点(ephemeral znode),如果没有同名的临时节点存在,则znode创建成功,标志着服务A成功的获得了锁。
2) 服务B向zookeeper申请获得锁,同样尝试在zookeeper内创建一个临时节点(名称必须与服务A的相同),由于同名znode已经存在,因此请求被拒绝。接着,服务B会在zk中注册一个监听器,用于监听临时节点被删除的事件。
3) 若服务A主动向zk发起请求释放锁,或者服务A宕机、断开与zk的网络连接,zk会将服务A(创建者)创建的临时节点删除。而删除事件也将立刻被监听器捕获到,并反馈给服务B。最后,服务B再次向zookeeper申请获得锁。
2.3.2、实现代码
基于临时节点实现Zookeeper分布式锁
多个服务如果想竞争同一把锁,那就向Zookeeper发起创建临时节点的请求,若能成功创建则获得锁,否则借助监听器,当监听到锁被其它服务释放(临时节点被删除),则自己再请求创建临时节点,反复这几个步骤直到成功创建临时节点或者与zookeeper建立的会话超时。
步骤:
1)定义变量
代码语言:javascript复制 /**
* 与Zookeeper成功建立连接的信号标志
*/
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
/**
* 创建分布式锁的过程中,开始和等待请求创建分布式锁的信号标志
*/
private CountDownLatch creatingSemaphore;
/**
* Zookeeper客户端
*/
private ZooKeeper zookeeper;
/**
* 分布式锁的过期时间 单位:毫秒
*/
private static final Long DISTRIBUTED_KEY_OVERDUE_TIME = 30000L;
2)构造函数
代码语言:javascript复制public ZookeeperLock() {
try {
this.zookeeper = new ZooKeeper("192.168.0.93:2181", 5000, new ZookeeperWatcher());
try {
connectedSemaphore.await();
} catch (InterruptedException ite) {
log.error("等待Zookeeper成功建立连接的过程中,线程抛出异常", ite);
}
log.info("与Zookeeper成功建立连接");
} catch (Exception e) {
log.error("与Zookeeper建立连接时出现异常", e);
}
}
3)获取分布式锁
实际上就是在尝试创建临时节点znode create(final String path, byte data[], List acl,CreateMode createMod) path: 从根节点"/"到当前节点的全路径 data: 当前节点存储的数据 (由于这里只是借助临时节点的创建来实现分布式锁,因此无需存储数据) acl: Access Control list 访问控制列表 主要涵盖权限模式(Scheme)、授权对象(ID)、授予的权限(Permission)这三个方面 OPEN_ACL_UNSAFE 完全开放的访问控制 对当前节点进行操作时,无需考虑ACL权限控制 createMode: 节点创建的模式 EPHEMERAL(临时节点) 当创建节点的客户端与zk断开连接后,临时节点将被删除 EPHEMERAL_SEQUENTIAL(临时顺序节点) PERSISTENT(持久节点) PERSISTENT_SEQUENTIAL(持久顺序节点)
代码语言:javascript复制public boolean acquireDistributeLock(Long lockId) {
String path = "/product-lock-" lockId;
try {
zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
log.info("ThreadId=" Thread.currentThread().getId() "创建临时节点成功");
return true;
} catch (Exception e) {
// 若临时节点已存在,则会抛出异常: NodeExistsException
while (true) {
// 相当于给znode注册了一个监听器,查看监听器是否存在
try {
Stat stat = zookeeper.exists(path, true);
if (stat != null) {
this.creatingSemaphore = new CountDownLatch(1);
this.creatingSemaphore.await(DISTRIBUTED_KEY_OVERDUE_TIME, TimeUnit.MILLISECONDS);
this.creatingSemaphore = null;
}
zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception ex) {
log.error("ThreadId=" Thread.currentThread().getId() ",查看临时节点时出现异常", ex);
}
}
}
}
4)释放分布式锁
代码语言:javascript复制public void releaseDistributedLock(Long lockId) {
String path = "/product-lock-" lockId;
try {
// 第二个参数version是数据版本 每次znode内数据发生变化,都会使version自增,但由于分布式锁创建的临时znode没有存数据,因此version=-1
zookeeper.delete(path, -1);
log.info("成功释放分布式锁, lockId=" lockId ", ThreadId=" Thread.currentThread().getId());
} catch (Exception e) {
log.error("释放分布式锁失败,lockId=" lockId, e);
}
}
5)建立Zookeeper的watcher
不论是zk客户端与服务器连接成功,还是删除节点,watcher监听到的事件都是SyncConnected
代码语言:javascript复制private class ZookeeperWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
log.info("接收到事件: " event.getState() ", ThreadId=" Thread.currentThread().getId());
if (Event.KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
if (creatingSemaphore != null) {
creatingSemaphore.countDown();
}
}
}
6)main方式运用
创建了两个线程,其中第一个线程先执行,且持有锁5秒钟才释放锁,第二个线程后执行,当且仅当第一个线程释放锁(删除临时节点)后,第二个线程才能成功获取锁。
代码语言:javascript复制public static void main(String[] args) throws InterruptedException{
long lockId = 20200730;
new Thread(() ->{
ZookeeperLock zookeeperLock = new ZookeeperLock();
System.out.println("ThreadId1=" Thread.currentThread().getId());
System.out.println("ThreadId=" Thread.currentThread().getId() "获取到分布式锁: " zookeeperLock.acquireDistributeLock(lockId));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
log.error("ThreadId=" Thread.currentThread().getId() "暂停时出现异常", e);
}
zookeeperLock.releaseDistributedLock(lockId);
}).start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
ZookeeperLock zookeeperLock = new ZookeeperLock();
System.out.println("ThreadId2=" Thread.currentThread().getId());
System.out.println("ThreadId=" Thread.currentThread().getId() "获取到分布式锁: " zookeeperLock.acquireDistributeLock(lockId));
}).start();
}
三、基于Redis实现分布式锁
3.1、普通常见实现方式
3.1.1、实现代码
代码语言:javascript复制public String deductStock() {
String lockKey = "product_001";
try {
/*Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "aaa"); //jedis.setnx
stringRedisTemplate.expire(lockKey, 30, TimeUnit.SECONDS); //设置超时*/
//为解决原子性问题将设置锁和设置超时时间合并
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "aaa", 10, TimeUnit.SECONDS);
//未设置成功,当前key已经存在了,直接返回错误
if (!result) {
return "error_code";
}
//业务逻辑实现,扣减库存
....
} catch (Exception e) {
e.printStackTrace();
}finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
3.2.2、问题分析
上述代码可以看到,当前锁的失效时间为10s,如果当前扣减库存的业务逻辑执行需要15s时,高并发时会出现问题:
- 线程1http://jintianxuesha.com/?id=3834,首先执行到10s后,锁(product_001)失效
- 线程2,在第10s后同样进入当前方法,此时加上锁(product_001)
- 当执行到15s时,线程1删除线程2加的锁(product_001)
- 线程3,可以加锁 .... 如此循环,实际锁已经没有意义
3.2.3、解决方案
定义一个子线程,定时去查看是否存在主线程的持有当前锁,如果存在则为其延长过期时间。
3.2、基于Redission实现方式
3.2.1、Redission简介
Jedis是Redis的Java实现的客户端,其API提供了比较全面的Redis命令的支持。Redission也是Redis的客户端,相比于Jedis功能简单。Jedis简单使用阻塞的I/O和redis交互,www.jintianxuesha.com
Redission通过Netty支持非阻塞I/O。
Redission封装了锁的实现,其继承了java.util.concurrent.locks.Lock的接口,让我们像操作我们的本地Lock一样去操作Redission的Lock。