最初看过Zookeeper实现分布式锁的设计思想,感觉类似于之前写的Redis链接 ,都是通过只有一个客户端创建某个结点,只要结点存在其他的结点无法创建,实现一种资源的占用,后来又想想其实锁不就是这样嘛,也不是只是Zookeeper和Redis,就算是单进程各个线程之间的锁其实也是一种对象资源管控罢了。 后面接触多了,发现其实Zookeeper和Redis的分布式锁还是不一样滴~
一,先看一下有些部分类似于Redis的Zookeeper式分布式锁
如图所示,我们三个客户端去监听同一节点,那么只有一个客户端比如A客户端能够创建成功,那么其余结点由于监听了节点事件,当A使用完毕更改了结点状态,其余节点也就知道可以开始再次抢占资源了。
羊群效应:但是这样做有个缺点,由于众多结点监听该同一事件,那么每次发生节点变化虽然只有一个结点能够抢占资源,但是却惊动了整个群体,造成很多事件的变更,显然如果对于很多结点的分布式应用,这样并不划算。
二,利用有序节点实现分布式锁
如图所示,每个客户端都去锁结点下创建一个属于自己的有序临时结点,临时结点会监听一个比自己小的结点,如果自己是当前最小的,那么就代表你是可以被正常使用的(抢占资源)。
这里以Java提供的Lock为基础进行扩展实现,并且以继承的方式加watcher机制。Zk操作使用ZK提供的原生Java API。
分布式锁代码
代码语言:javascript复制public class DistributedLock implements Lock, Watcher {
private ZooKeeper zk=null;//创建zk客户端
private String ROOT_LOCK="/locks";//定义一个根节点
private String WAIT_LOCK;//等待前一个锁
private String CURRENT_LOCK;//表示当前锁
private CountDownLatch countDownLatch;
public DistributedLock( ) {
try {
this.zk = new ZooKeeper("Ip:port", 4000, this);//这里watcher放在本类中实现了
//判断根节点是否存在
Stat stat=zk.exists(ROOT_LOCK,false);
if (stat==null){
zk.create(ROOT_LOCK,"0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public boolean tryLock() {//当结点调用尝试获得锁方法时
try {
//创建临时有序结点并复制给当前锁
CURRENT_LOCK=zk.create(ROOT_LOCK "/","0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//创建临时有序结点
System.out.println(Thread.currentThread().getName() "创建临时结点--->" CURRENT_LOCK "--->尝试获得锁");
//获取孩子结点(锁结点)
List<String> childrens = zk.getChildren(ROOT_LOCK, false);
//创建孩子集合并添加到有序Set中
SortedSet<String> nodes=new TreeSet<>();
for (String children:childrens){
nodes.add(ROOT_LOCK "/" children);
}
String firstNode = nodes.first();
//尝试获得比当前结点更小的结果集,以此来获取到最后一个最大的结点也就是本结点前一个结点
//注意,这里不可以用nodes.last(),因为这样获取到的是全部结点最后一个结点,是大于本结点那个
SortedSet<String> lessThenMe = ((TreeSet<String>)nodes).headSet(CURRENT_LOCK);
if (CURRENT_LOCK.equals(firstNode)){//如果当前结点是集合中最小的结点则其获得锁
return true;
}
if (!lessThenMe.isEmpty()){//如果存在比当前结点更小的结点
WAIT_LOCK=lessThenMe.last();//获得该有序结合的最后一个结点,即前一个结点,设置给WAIT_LOCL
System.out.println("lessThenMe最后一个 : " lessThenMe.last());
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public void lock() {
if (this.tryLock()){//如果获得锁成功
System.out.println(Thread.currentThread().getName() "获得锁成功");
return;
}
//如果没有成功获得锁,那么等待前一个结点释放锁
waitForLock(WAIT_LOCK);
}
private boolean waitForLock(String preNode){
try {
//监听上一个比自己小的结点
Stat stat=zk.exists(preNode,true);
if (stat!=null){
System.out.println(Thread.currentThread().getName() "等待--->" ROOT_LOCK "/" preNode "释放锁");
countDownLatch=new CountDownLatch(1);
countDownLatch.await();
System.out.println(Thread.currentThread().getName() "--->获得锁");
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName() "释放锁" CURRENT_LOCK);
try {
zk.delete(CURRENT_LOCK,-1);//version=-1不管如何都会删除
CURRENT_LOCK=null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void process(WatchedEvent watchedEvent) {
if (this.countDownLatch!=null){
this.countDownLatch.countDown();
}
}
}
测试类
代码语言:javascript复制public class ZkDistributedLockTest {
public static void main(String[] args) throws IOException {
CountDownLatch countDownLatch=new CountDownLatch(10);
for(int i=0;i<10;i ){
new Thread(()->{
try {
countDownLatch.await();
DistributedLock distributedLock=new DistributedLock();
distributedLock.lock();//当前线程尝试获得锁
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程" i).start();
countDownLatch.countDown();
}
System.in.read();
}
}
说一下思想:
1.这里设定了一个倒计时CountDownLatch1用于控制和阻塞并发线程;
2.我们让每个线程创建分布式锁工具类**DistributedLock,每个线程会连接到ZK服务器成为一个客户端,只有一个创建跟结点
3.接下来每个线程都去尝试调用我们的lock()方法,我们的lock会去调用tryLock()方法
4.首先,我们让每个线程在tryLock()里去创建一个临时顺序结点,跟线程名无关,先到者排序在前
5.然后,我们会查出此时分布式锁根节点下的所有顺序结点,如果当前结点是最小的结点那么它将会获得锁,如果当前结点不是最小的结点,那么找到前一个结点并监听,并且利用countdownlatch设置倒计时为1,并阻塞当前线程2
6.当前一个结点出现监听事件(删除,更改),我们释放countdownlatch,释放本线程
如此,我们可以以分布式锁的形式实现Zk结点的顺序作用。
上述提供的是一种自己实现watcher和lock利用原生zk API简单实现分布式锁,实际上curator提供了许多高度封装好的分布式锁简化了步骤,而且提供了更多细节操作,比如读写超市断开连接的选举等~
eg
参考文献:
1 CountDownLauch解析.https://cloud.tencent.com/developer/article/2002445
2利用SortedSet实现结点排序,SortedSet.headSet(string key),可以取出小于key的结点,SortedSet.first获取第一个结点,SortedSet.last获取最后一个结点具体的https://blog.csdn.net/xjk201/article/details/81586209
哈哈哈,写论文入魔了,搞个参考文献皮一下~