仿写@ScheduleLock 定时任务

2021-05-11 10:34:51 浏览数 (1)

最近公司在搞分布式的定时任务, 怎么满足分布式的定时任务锁。 我看了大量的开源的代码。 https://github.com/lukas-krecan/ShedLock 感觉老外写的非常的不错。

其实底层也就是分布式锁 aop 的切片来实现的。那既然别人也能实现。 那我们也可以的。 这里分布式锁我们使用zookeeper 来实现。具体的客户端我使用zookeeper 的curator 来实现。 官网地址 http://curator.apache.org/getting-started.html , 写的比较详细,清晰。 分布式锁也有好几种形式。 下面我们就来写代码实现一下。

  1. 首先安装zookeeper

2. 我们看到了是一个空的客户端, 现在已经连接上了。

3, 新建一个maven Springboot工程 导入curator 的客户端pom

4. 编写客户端的配置类

代码语言:javascript复制
@Component
public class ZkClientConfig {

     // 从配置文件拿取zk 的连接
     @Value("${zk.url}")
     private String zkConnection;

     private static CuratorFramework client;

     @PostConstruct
     public void startClient(){
         // 100 毫秒从新连接三次。
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         client = CuratorFrameworkFactory.newClient(zkConnection, retryPolicy);
         client.start();
     }

}

要是看见出现这种,就说明连接成功了。

在编写可重入锁的service

代码语言:javascript复制
package com.example.demo.service;

import com.example.demo.config.ZkClientConfig;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

import java.util.concurrent.TimeUnit;

@Service
public class ZkClientService {


    private static  InterProcessMutex lock;

    /**
     * 尝试获取锁
     * @param time
     * @param timeUnit
     * @return
     */
    public boolean lock(long time , TimeUnit timeUnit,String lockPath){
          try {
              lock = new InterProcessMutex(ZkClientConfig.client, lockPath);

           return    lock.acquire(time,timeUnit);
          }catch (Exception e){
              return  false;
          }
    }

    /**
     *  创建父节点和子节点
     * @param lockPath
     * @throws Exception
     */
    public void create(String lockPath) throws Exception {
        ZkClientConfig.client.create().forPath(lockPath,"test".getBytes());

    }

   /**/ 遍历路径下面的节点
    public int SizeForPath(String path) throws Exception {
       return ZkClientConfig.client.getChildren().forPath(path).size();
    }

    /**/ 判断路径是否存在
    public Boolean isExist(String path) throws Exception {
        return !ObjectUtils.isEmpty(ZkClientConfig.client.checkExists().forPath(path));
    }



    /**
     * 尝试获取锁
     * @return
     */
    public void  lockNotTime(String lockPath){
        try {
            lock = new InterProcessMutex(ZkClientConfig.client, lockPath);
            lock.acquire();
        }catch (Exception e){
            e.printStackTrace();
        }
    }


    /**
     *  释放锁
     */
    public  void releaseLock(){
        try {
            lock.release();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

启动项目,调用初始化lock 的方法 看一下zk 的界面

但是这个节点释放锁就会消失, 关闭zk 也会消失。说明是临时的。 根据后缀来看应该是顺序的节点。

好了, 下面我们就来自定义注解了。

代码语言:javascript复制
@Documented
@Retention(RUNTIME)
@Target(METHOD)
public @interface DCScheduleLock {

    /**
     *  定时任务的名称,其实也就是zk的节点,具有唯一性,不能重复。
     * @return
     */
    String name() ;

    /**
     *  锁的时长
     * @return
     */
    long time();

    /**
     * 锁时长单位
     * @return
     */
    TimeUnit unit();

}

在写一下AOP切片

代码语言:javascript复制
@Component
@Aspect
public class SchedulelockAop {

    // path 路径前缀
    private final static String PATH_PREFIX = "/";

    // 切片的注解
    @Pointcut("@annotation(com.example.demo.annotation.DCScheduleLock)")
    public void dcScheduleLock() {

    }

    @Autowired
    private ZkClientService zkClientService;

   //阻塞多长时间
    private static ThreadLocal<Long> threadLocalLong = new ThreadLocal<>();

    // 判断单个还是多个实列, 单个实例不走延迟
    private static ThreadLocal<Boolean> single = new ThreadLocal<>();



    @Around( value = "dcScheduleLock()")
    public Object lockAround(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature)joinPoint.getSignature();
        Method method = signature.getMethod();
        DCScheduleLock dcScheduleLock = AnnotationUtils.findAnnotation(method, DCScheduleLock.class);
        Scheduled scheduled = AnnotationUtils.findAnnotation(method, Scheduled.class);
        if(!ObjectUtils.isEmpty(scheduled)) {
           // 判断是否存在根路径,不存在就创建
            if(!zkClientService.isExist(PATH_PREFIX method dcScheduleLock.name())){
                zkClientService.create(PATH_PREFIX method dcScheduleLock.name());
            }
            //  判断当前根路径,也就是子路径的个数
            int size =  zkClientService.SizeForPath(PATH_PREFIX method dcScheduleLock.name());
            // 判断是单个实列还是多个
            if(size<2){
                if(!zkClientService.isExist(PATH_PREFIX method dcScheduleLock.name() PATH_PREFIX String.valueOf(Thread.currentThread().getId()))){
                    zkClientService.create(PATH_PREFIX method dcScheduleLock.name() PATH_PREFIX String.valueOf(Thread.currentThread().getId()));
                }
                single.set(true);
            }else {
                single.set(false);
            }
            System.out.println("实例的个数: " size);
            Object obj = null;
            System.out.println(" 线程id "  Thread.currentThread().getId());
            threadLocalLong.set(timeToLong(dcScheduleLock.unit(),dcScheduleLock.time(),scheduled.cron()));
            if(zkClientService.lock(dcScheduleLock.time(),dcScheduleLock.unit() ,PATH_PREFIX  dcScheduleLock.name())){
                 // 抢到锁就执行方法,没有就不做任何处理。
                 obj = joinPoint.proceed();
            }
            return  obj;
             }else {
            System.out.println( Thread.currentThread().getName()  "线程没有获取锁 ");
            return joinPoint.proceed();
        }
    }
  
    // 后置处理器,获取阻塞的时间才能释放锁
    @After(value = "dcScheduleLock()")
    public void after(JoinPoint joinPoint) throws InterruptedException {
        // 单室例节点不阻塞
        if(!single.get()){
            Thread.sleep(threadLocalLong.get());
            threadLocalLong.remove();
        }
       //切记一定要释放锁。 
        zkClientService.releaseLock();
        System.out.println( Thread.currentThread().getName()  "线程释放锁成功 ");

    }


    /**
     *  这里先写死就是五秒钟, 其实是拿到cron  和 所得时间比较, 取最大的时间
     * @param timeUnit
     * @param time
     * @return
     */
    public  long timeToLong(TimeUnit timeUnit,long time ,String cron){
        return 1000*5;
    }
}

在写一下测试代码:

代码语言:javascript复制
  @Scheduled(cron = "0/5 * * * * ?")
  @DCScheduleLock(name = "test",time =2,unit = TimeUnit.SECONDS)
  public void   test(){
    System.out.println(" current  thread --->>>>"  new Date());
  }

单个机器测试:

没问题,两台机器测试我们看看啊。

这里的实列也是用节点来标识的,路径是schedulLock 的name 和方法名称。

看一下完美解决了, 一共两个实列。

后面还没写完,实列不应该是持久化的节点,也应该是临时的。

最后看一下zk

里面存放的使我们两个实列。

具体流程图

0 人点赞