使用zookeeper原生api实现分布式锁
zookeeper实现分布式锁
一种是所有节点都监听最小节点 当最小节点删除后 其他节点创建临时节点 谁创建成功 就意味获取到锁 如果客户端太多 服务端删除一个节点 服务端短时间向其他所有客户端发送大量通知 这就是羊群效应
另一种思路就是创建临时有序节点 当前节点监听比他小的节点中的最大节点
当 这个节点删除后触发监控事件 当前节点就是最小节点了 即获取到了锁 避免了羊群效应
DistributeLock类
代码语言:javascript复制package lock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author: xiepanpan
* @Date: 2020/6/2
* @Description: zookeeper实现分布式锁
*/
public class DistributeLock implements Lock, Watcher {
private ZooKeeper zooKeeper = null;
//定义根节点
private String ROOT_LOCK ="/locks";
//等待前一个锁
private String WAIT_LOCK;
//表示当前的锁
private String CURRENT_LOCK;
private CountDownLatch countDownLatch;
public DistributeLock() {
Watcher watcher;
try {
zooKeeper = new ZooKeeper("192.168.217.130:2181", 4000, this);
//判断根节点是存在
Stat stat = zooKeeper.exists(ROOT_LOCK, false);
if (stat==null) {
//创建根节点 持久节点
zooKeeper.create(ROOT_LOCK,"0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public void lock() {
if (this.tryLock()) {
System.out.println(Thread.currentThread().getName() "->" CURRENT_LOCK "获得锁成功");
return;
}
try {
waitForLock(WAIT_LOCK);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private boolean waitForLock(String prev) throws KeeperException, InterruptedException {
//监听当前节点的上一个节点
Stat stat = zooKeeper.exists(prev, true);
if (stat!=null) {
System.out.println(Thread.currentThread().getName() "等待锁" ROOT_LOCK "/" prev "释放");
countDownLatch = new CountDownLatch(1);
countDownLatch.await();
System.out.println(Thread.currentThread().getName() "获得锁");
}
return true;
}
public void lockInterruptibly() throws InterruptedException {
}
public boolean tryLock() {
try {
//创建临时有序节点
CURRENT_LOCK = zooKeeper.create(ROOT_LOCK "/","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() "->" CURRENT_LOCK "尝试竞争锁");
List<String> childrenList = zooKeeper.getChildren(ROOT_LOCK, false);
//定义一个 有序集合进行排序
SortedSet<String> sortedSet = new TreeSet<String>();
for (String child: childrenList) {
sortedSet.add(ROOT_LOCK "/" child);
}
String firstNode = sortedSet.first();
//获取比当前节点小的所有节点
SortedSet<String> lessThenMe = sortedSet.headSet(CURRENT_LOCK);
if (CURRENT_LOCK.equals(firstNode)) {
//当前节点和子节点最小的节点进行比较 如果相等 表示获得锁成功
return true;
}
//存在比当前节点小的节点
if(!lessThenMe.isEmpty()) {
//取比当前节点小的最大节点 进行监听
WAIT_LOCK = lessThenMe.last();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
public void unlock() {
System.out.println(Thread.currentThread().getName() "释放锁" CURRENT_LOCK);
try {
//删除节点
zooKeeper.delete(CURRENT_LOCK,-1);
CURRENT_LOCK = null;
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public Condition newCondition() {
return null;
}
/**
* 处理监听
* @param event
*/
public void process(WatchedEvent event) {
if (this.countDownLatch!=null) {
this.countDownLatch.countDown();
}
}
}
测试类
代码语言:javascript复制package lock;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* @author: xiepanpan
* @Date: 2020/6/2
* @Description: 测试zookeeper的分布式锁
*/
public class App {
public static void main(String[] args) throws IOException {
final CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i ) {
new Thread(()->{
try {
countDownLatch.await();
DistributeLock distributeLock = new DistributeLock();
//获得锁
distributeLock.lock();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Thread" i).start();;
countDownLatch.countDown();
}
System.in.read();
}
}
效果:
写着玩