为什么要用Zookeeper做分布式锁?
在说Zookeeper做分布式锁之前,我们知道Redis也可以做分布式锁。那我为什么要用Zookeeper做分布式锁呢?
上图为数据库,Redis,Zookeeper实现分布式锁的技术对比。不用说,数据库实现分布式锁的性能肯定很低,Redis虽然性能很高,但是最终一致性上却是输于Zookeeper。Zookeeper在分布式集群上有天然优势。在生产环境中,中间件一般以集群形式部署,那么这里涉及到主从同步问题,Redis主从集群中,如果Master节点在向Slave节点发送RDB文件同步时,有可能Master节点挂掉,文件里面的数据有可能会有一秒的丢失。然而Redis有一套同步确认的策略。用Redis上锁实际是Redis里面setnx 数据,一般需要主节点同步数据给从节点成功了,才能认为成功。当然为了提升效率,有一个RedLock模型,该模型认为只要同步节点成功数超过半数就认为上锁成功。然而这个RedLock模型算法维护起来相当麻烦。但是Zookeeper天然支持这个模型。
还有一个用Zookeeper的原因在于其有一个watcher机制,线程抢不到锁时阻塞,当持有锁的线程释放锁时,这个watcher机制会主动通知其他线程唤醒去抢锁。而Redis的锁,线程抢锁的过程是一个自旋setnx K-V的过程,这种抢锁没有Zookeeper的抢锁方式优雅。所以我喜欢用Zookeeper实现分布式锁。
Zookeeper特性
1.节点树数据结构,znode是一个跟Unix文件系统路径相似的节点,可以往这个节点存储或获取数据; 2.通过客户端可对znode进行增删改查的操作,还可以注册watcher监控znode的变化。
通过上面两个特性可以实现分布式锁。
上图为Zookeeper实现分布式锁的流程图。下面为代码的实现。
代码实现
创建一个ZkDistributeLock
类实现Lock接口
@Slf4j
public class ZkDistributeLock implements Lock {
@Autowired
private ZookeeperConfig config;
private String lockPath;
private ZkClient client;
private String currentPath;
private String beforePath;
...
}
构造函数里面创建锁的根节点。
代码语言:javascript复制public ZkDistributeLock(String lockPath) {
super();
this.lockPath = lockPath;
ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
this.client = zookeeperConfig.getConnectionWithoutSpring();
this.client.setZkSerializer(new MyZkSerializer());
if (!this.client.exists(lockPath)) {
try {
this.client.createPersistent(lockPath);
} catch (ZkNodeExistsException e) {
e.getStackTrace();
}
}
}
lock()
方法编写
@Override
public void lock() {
if (!tryLock()) {
//没获得锁,阻塞自己
waitForLock();
//再次尝试
lock();
}
}
tryLock()
方法编写
@Override
public boolean tryLock() { //不会阻塞
//创建节点
if (this.currentPath == null) {
currentPath = this.client.createEphemeralSequential(lockPath "/", "lock");
}
List<String> children = this.client.getChildren(lockPath);
Collections.sort(children);
if (currentPath.equals(lockPath "/" children.get(0))) {
return true;
} else {
int currentIndex = children.indexOf(currentPath.substring(lockPath.length() 1));
beforePath = lockPath "/" children.get(currentIndex - 1);
}
log.info("锁节点创建成功:{}", lockPath);
return false;
unlock()
方法编写
@Override
public void unlock() {
client.delete(this.currentPath);
}
waitForLock()
方法编写
private void waitForLock() {
CountDownLatch count = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println(String.format("收到节点[%s]被删除了",s));
count.countDown();
}
};
client.subscribeDataChanges(this.beforePath, listener);
//自己阻塞自己
if (this.client.exists(this.beforePath)) {
try {
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取消注册
client.unsubscribeDataChanges(this.beforePath, listener);
}
测试
代码语言:javascript复制//创建订单时加锁
@Override
public void createOrder() {
String lockPath = "/distribute_lock";
String orderCode = null;
//zk 分布式锁
Lock lock = new ZkDistributeLock(lockPath);
// zkDistributeLock.setLockPath(lockPath);
lock.lock();
try {
orderCode = ocg.getOrderCode();
} finally {
lock.unlock();
}
log.info("当前线程:{},生成订单编号:{}",Thread.currentThread().getName() , orderCode);
//其他逻辑
}
@Test
public void testDisLock() {
//并发数
int currency = 20;
//循环屏障
CyclicBarrier cyclicBarrier = new CyclicBarrier(currency);
for (int i = 0; i < currency; i ) {
new Thread(() -> {
// OrderServiceImplWithDisLock orderService = new OrderServiceImplWithDisLock();
System.out.println(Thread.currentThread().getName() "====start====");
//等待一起出发
try {
//CyclicBarrier共享锁模拟并发
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
orderService.createOrder();
}).start();
}
}
运行test方法,zookeeper上面会生成锁节点。
线程抢到锁,会显示创建锁节点成功
并且执行锁里面的代码
抢不到锁,会阻塞,等待被释放的锁
释放锁时,节点删除,wacther机制通知
抢到锁的线程,执行当前线程里面的任务
github代码:https://github.com/lvshen9/demo/blob/lvshen-dev/src/main/java/com/lvshen/demo/distributelock/ZkDistributeLock.java