Java Review - 并发编程_ScheduledThreadPoolExecutor原理&源码剖析

2021-12-30 19:49:59 浏览数 (3)

文章目录

  • 概述
  • 类结构
  • 核心方法&源码解析
    • schedule(Runnable command, long delay,TimeUnit unit)
    • scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
    • scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
  • 小结

概述

Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析 我们复习了Java中线程池ThreadPoolExecutor的原理,ThreadPoolExecutor只是Executors工具类的一部分功能。

下面来介绍另外一部分功能,也就是ScheduledThreadPoolExecutor的实现,这是一个可以在指定一定延迟时间后或者定时进行任务调度执行的线程池。

类结构

  • Executors其实是个工具类,它提供了好多静态方法,可根据用户的选择返回不同的线程池实例。
  • ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。
  • 线程池队列是DelayedWorkQueue,其和DelayedQueue类似,是一个延迟队列
  • ScheduledFutureTask是具有返回值的任务,继承自FutureTask。FutureTask的内部有一个变量state用来表示任务的状态,一开始状态为NEW,所有状态为
代码语言:javascript复制
    private static final int NEW          = 0; // 初始状态
    private static final int COMPLETING   = 1; // 执行中
    private static final int NORMAL       = 2; // 正常运行结束状态
    private static final int EXCEPTIONAL  = 3; // 运行中异常
    private static final int CANCELLED    = 4; // 任务被取消
    private static final int INTERRUPTING = 5; // 任务正在被中断
    private static final int INTERRUPTED  = 6; // 任务已经被中断

可能的任务状态转换路径为

代码语言:javascript复制
NEN-> COMPLETING-> NORMAL//初始状态->执行中ー>正常结東
NEN-> COMPILETING-> EXCEPTIONAL//初始状态->执行中ー>执行异常
NEN-> CANCELLED//初始状态一>任务取消
NEN-> INTERRUPTING-> INTERRUPTED//初始状态->被中断中->被中断
  • ScheduledFutureTask内部还有一个变量period用来表示任务的类型,任务类型如下 period=0,说明当前任务是一次性的,执行完毕后就退出了。 period为负数,说明当前任务为fixed-delay任务,是固定延迟的定时可重复执行任务。 period为正数,说明当前任务为fixed-rate任务,是固定频率的定时可重复执行任务
  • ScheduledThreadPoolExecutor的一个构造函数如下,由该构造函数可知线程池队列是DelayedWorkQueue。
代码语言:javascript复制
    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
	
	// 使用改造后的DelayQueue
    public ScheduledThreadPoolExecutor(int corePoolSize) {
    	// 调用父类ThreadPoolExecutor的构造函数
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
代码语言:javascript复制
   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

核心方法&源码解析

schedule(Runnable command, long delay,TimeUnit unit)

该方法的作用是提交一个延迟执行的任务,任务从提交时间算起延迟单位为unit的delay时间后开始执行。提交的任务不是周期性任务,任务只会执行一次.

代码语言:javascript复制
    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        //  1 参数校验                               
        if (command == null || unit == null)
            throw new NullPointerException();
        //  2 任务转换 
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        // 3 添加任务到延迟队列                                  
        delayedExecute(t);
        return t;
    }

上面我们分析了如何向延迟队列添加任务,下面我们来看线程池里面的线程如何获取并执行任务。

前面说ThreadPoolExecutor时我们说过,具体执行任务的线程是Worker线程,Worker线程调用具体任务的run方法来执行。由于这里的任务是ScheduledFutureTask,所以我们下面看看ScheduledFutureTask的run方法

代码语言:javascript复制
    /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {
        	// 8 是否只执行一次 
            boolean periodic = isPeriodic();
            // 9 取消任务
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            // 10 只执行一次,调用schedule方法
            else if (!periodic)
                ScheduledFutureTask.super.run();
            // 11 定时执行    
            else if (ScheduledFutureTask.super.runAndReset()) {
            	// 11.1 设置time=time period
                setNextRunTime();
                // 11.2 重新加入该任务到delay队列
                reExecutePeriodic(outerTask);
            }
        }

在什么时候多个线程会同时执行CAS将当前任务的状态从NEW转换到COMPLETING?其实当同一个command被多次提交到线程池时就会存在这样的情况,因为同一个任务共享一个状态值state。

如果任务执行失败,则执行代码(13.1)。setException的代码如下,可见与set函数类似。

代码语言:javascript复制
   protected void setException(Throwable t) {
   		// 如果当前任务的状态为NEW,则设置为COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            // 设置当前任务的状态为EXCEPTIONAL,也就是任务非正常结束
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

到这里代码(10)的逻辑执行完毕,一次性任务也就执行完毕了

scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

该方法的作用是,当任务执行完毕后,让其延迟固定时间后再次运行(fixed-delay任务)

  • initialDelay表示提交任务后延迟多少时间开始执行任务command
  • delay表示当任务执行完毕后延长多少时间后再次运行command任务
  • unit是initialDelay和delay的时间单位

任务会一直重复运行直到任务运行中抛出了异常,被取消了,或者关闭了线程池。

代码语言:javascript复制
 /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
       // 14 参数校验                                              
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        // 15 任务转换   ,注意这里的 poeriod = -dealy < 0   【 unit.toNanos(-delay)】
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        // 16 添加任务到队列
        delayedExecute(t);
        return t;
    }
  • 代码(14)进行参数校验,校验失败则抛出异常
  • 代码(15)将command任务转换为ScheduledFutureTask。这里需要注意的是,传递给ScheduledFutureTask的period变量的值为-delay,period<0说明该任务为可重复执行的任务。
  • 然后代码(16)添加任务到延迟队列后返回。

将任务添加到延迟队列后线程池线程会从队列里面获取任务,然后调用ScheduledFutureTask的run方法执行。由于这里period<0,所以isPeriodic返回true,所以执行代码(11)。runAndReset的代码如下。

代码语言:javascript复制
    /**
     * Executes the computation without setting its result, and then
     * resets this future to initial state, failing to do so if the
     * computation encounters an exception or is cancelled.  This is
     * designed for use with tasks that intrinsically execute more
     * than once.
     *
     * @return {@code true} if successfully run and reset
     */
    protected boolean runAndReset() {
    	// 17 
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        // 18     
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

该代码和FutureTask的run方法类似,只是任务正常执行完毕后不会设置任务的状态,这样做是为了让任务成为可重复执行的任务。

这里多了代码(19),这段代码判断如果当前任务正常执行完毕并且任务状态为NEW则返回true,否则返回false。 如果返回了true则执行代码(11.1)的setNextRunTime方法设置该任务下一次的执行时间。

代码语言:javascript复制
  /**
         * Sets the next time to run for a periodic task.
         */
        private void setNextRunTime() {
            long p = period;
            if (p > 0) // ffixed-rate类型任务
                time  = p;
            else // fixed-delay 类型任务 
                time = triggerTime(-p);
        }

这里p<0说明当前任务为fixed-delay类型任务。然后设置time为当前时间加上-p的时间,也就是延迟-p时间后再次执行。

fixed-delay类型的任务的执行原理为: 当添加一个任务到延迟队列后,等待initialDelay时间,任务就会过期,过期的任务就会被从队列移除,并执行。执行完毕后,会重新设置任务的延迟时间,然后再把任务放入延迟队列,循环往复。需要注意的是,如果一个任务在执行中抛出了异常,那么这个任务就结束了,但是不影响其他任务的执行。

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

该方法相对起始时间点以固定频率调用指定的任务(fixed-rate任务)。当把任务提交到线程池并延迟initialDelay时间(时间单位为unit)后开始执行任务command 。然后从initialDelay period时间点再次执行,而后在 initialDelay 2 * period时间点再次执行,循环往复,直到抛出异常或者调用了任务的cancel方法取消了任务,或者关闭了线程池。

scheduleAtFixedRate的原理与scheduleWithFixedDelay类似,下面我们看下它们之间的不同点。

首先调用scheduleAtFixedRate的代码如下

代码语言:javascript复制
    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
		// 装饰任务,注意这里的period=period>0 不是负的
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

在如上代码中,在将fixed-rate类型的任务command转换为ScheduledFutureTask时设置period=period,不再是-period。

所以当前任务执行完毕后,调用setNextRunTime设置任务下次执行的时间时执行的是time = p而不再是time = triggerTime(-p)。

总结:相对于fixed-delay任务来说,fixed-rate方式执行规则为,时间为initdelday n*period时启动任务,但是如果当前任务还没有执行完,下一次要执行任务的时间到了,则不会并发执行,下次要执行的任务会延迟执行,要等到当前任务执行完毕后再执行。

小结

ScheduledThreadPoolExecutor的实现原理,其内部使用DelayQueue来存放具体任务。任务分为三种,其中一次性执行任务执行完毕就结束了,fixed-delay任务保证同一个任务在多次执行之间间隔固定时间,fixed-rate任务保证按照固定的频率执行。任务类型使用period的值来区分。

0 人点赞