使用ScheduledExecutorService执行周期性任务需要注意的地方

2022-03-28 20:13:32 浏览数 (1)

使用spring框架开发程序时基本上很少有人会使用ScheduledExecutorService来执行周期性任务,但是ScheduledExecutorService在某些场景下可能会用到,使用JDK自带的这个周期性调度器时一定要确保任务内部不能抛出运行时异常,否则后续任务将不会执行,至于原因,接下来将从源码角度分析下:

使用Excutors工厂类创建的ScheduledExecutorService,其实现类为ScheduledThreadPoolExecutor,

从上面代码可以看到,ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以执行任务调度还是使用的ThreadPoolExecutor类的runWorker方法,该方法不断从任务队列中取任务并执行,ScheduledExecutorService scheduleAtFixedRate方法内部创建了经过包装的任务ScheduledFutureTask,任务调度方法:

代码语言:javascript复制
    final void runWorker(Worker w) {
    	/**
		     *   RUNNING:    接收新任务并且处理任务队列中任务
		     *   SHUTDOWN:   不接收新任务但处理任务队列中任务
		     *   STOP:       不接收新任务且不处理任务队列中任务,中断所有任务,ShutdownNow方法调用时更新池状态为STOP
		     *   TIDYING:    所有任务已经结束,workerCount=0,线程状态转换为TIDYING,接下来执行terminated()方法
		     *   TERMINATED: terminated()方法已结束
    	 */
    	
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        	//getTask方法可能会阻塞或者等待超时等待任务
            while (task != null || (task = getTask()) != null) {
            	//每个工作线程一把锁
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                /**
				    private static final int RUNNING    = -1 << COUNT_BITS;  //0xe0000000
				    private static final int SHUTDOWN   =  0 << COUNT_BITS;
				    private static final int STOP       =  1 << COUNT_BITS;  //0x20000000
				    private static final int TIDYING    =  2 << COUNT_BITS;  //0x40000000
				    private static final int TERMINATED =  3 << COUNT_BITS;  //0x60000000
				    
				     * 线程池状态切换
				     * 
				     * RUNNING -> SHUTDOWN
				     *    shutdown方法
				     * (RUNNING or SHUTDOWN) -> STOP
				     *    调用shutdownNow
				     * SHUTDOWN -> TIDYING
				     *    When both queue and pool are empty
				     * STOP -> TIDYING
				     *    When pool is empty
				     * TIDYING -> TERMINATED
				     *    When the terminated() hook method has completed
				     *    				    
				    1. 如果线程池已经处于STOP状态并且当前线程没有被中断,中断线程 
				    2. 如果线程池还处于RUNNING或SHUTDOWN状态,并且当前线程已经被中断了,重新检查一下线程池状态,如果处于STOP状态并且没有被中断,那么中断线程
                 */
                //STOP只在shutDownNow中出现
                if ((runStateAtLeast(ctl.get(), STOP) ||(
                		 Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) 
                		&& !wt.isInterrupted())
                    wt.interrupt();
                
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    	//此处直接调用Runnable的run方法:在当前线程执行run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks  ;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask返回的就是ScheduledFutureTask,所以task.run()根据多态性执行的就是ScheduledFutureTask的run方法:

代码语言:javascript复制
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)  //不是周期性任务
                ScheduledFutureTask.super.run();   //调用FutureTask类run方法
            else if (ScheduledFutureTask.super.runAndReset()) {//周期性任务,没有返回结果,调用FutureTask类runAndReset方法
            	//设置下次执行的时间
                setNextRunTime();
                //重新将该任务放入任务队列
                reExecutePeriodic(outerTask);
            }
        }

ScheduledFutureTask将实际任务委托给父类完成,从上面代码可以看出,只有当ScheduledFutureTask.super.runAndReset()返回true时,才会设置任务下次执行时间并重新把该任务放入任务等待队列中,

周期性任务调用的是FutureTask.runAndReset()方法,下面就是揭开问题面纱的部分:

代码语言:javascript复制
   protected boolean runAndReset() {
       if (state != NEW ||
           !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                        null, Thread.currentThread()))
           return false;
       boolean ran = false;
       int s = state;
       try {
           Callable<V> c = callable;
           if (c != null && s == NEW) {
               try {
                   c.call(); //如果此处抛出异常,那么ran将不会被设置为true,该方法返回false
                   ran = true;
               } catch (Throwable ex) {
                   setException(ex);
               }
           }
       } finally {
           // runner must be non-null until state is settled to
           // prevent concurrent calls to run()
           runner = null;
           // state must be re-read after nulling runner to prevent
           // leaked interrupts
           s = state;
           if (s >= INTERRUPTING)
               handlePossibleCancellationInterrupt(s);
       }
       return ran && s == NEW;
   }

收工~~~~~

0 人点赞