【小家Spring】Spring任务调度核心接口(类)之---TaskScheduler(任务调度器)、Trigger(触发器)、ScheduledTask(调度任务)详解

2019-09-03 15:25:22 浏览数 (1)

前言

先推荐阅读此篇:

【小家java】Java定时任务ScheduledThreadPoolExecutor详解以及与Timer、TimerTask的区别(执行指定次数停止任务)

某些时候我们可能需要在某些固定的时间或者是间隔一定的时间连续执行一些任务,如每天凌晨自动跑一些批次/心跳检测等。Spring通过使用TaskScheduler来完成这些功能。

任务调度框架设计到几个核心的接口,下面做如下介绍。

任务调度和JDK的定时器、线程池有关,推荐先阅读上面的{相关阅读}

TriggerContext

该接口表示触发的上下文。它能够获取上次任务原本的计划时间/实际的执行时间以及实际的完成时间

代码语言:javascript复制
//@since 3.0 我们发现每个方法都有可能返回null(比如首次执行)
public interface TriggerContext {
	// 上次预计的执行时间
	@Nullable
	Date lastScheduledExecutionTime();
	// 上次真正执行时间
	@Nullable
	Date lastActualExecutionTime();
	// 上次完成的时间
	@Nullable
	Date lastCompletionTime();

}

它的唯一实现类:SimpleTriggerContext,源码就不用看了,没有任何逻辑,只有一些赋值操作。

Trigger

TaskScheduler中将会使用到Trigger对象,所以先对它进行分析

Trigger接口用于计算任务的下次执行时间。它的接口定义如下:

代码语言:javascript复制
public interface Trigger {
	//获取下次执行时间
	@Nullable
	Date nextExecutionTime(TriggerContext triggerContext);
}

它有如上的两个实现类。

CronTrigger

顾名思义,它通过Cron表达式来生成调度计划。比如:

代码语言:javascript复制
scheduler.schedule(task, new CronTrigger("0 15 9-17 * * MON-FRI"));

以上表达式表示在工作日的9-17点之间,每隔15分钟执行一次;

Spring对cron表达式的支持,是由CronSequenceGenerator来实现的,不依赖于别的框架。下面给出一个Demo感受下:

代码语言:javascript复制
    public static void main(String[] args) {
        CronSequenceGenerator generator = new CronSequenceGenerator("0 15 * * * MON-FRI");

        Date next = generator.next(new Date());
        System.out.println(next); //Mon Apr 22 17:15:00 CST 2019
        System.out.println(generator.next(next)); //Mon Apr 22 18:15:00 CST 2019

    }

这个类若我们自己需要解析Cron表达式,也是可以拿出来使用的。

CronTrigger的原理也比较简单,主要是实现了nextExecutionTime方法:

代码语言:javascript复制
	@Override
	public Date nextExecutionTime(TriggerContext triggerContext) {
		Date date = triggerContext.lastCompletionTime();
		// 这里面有个处理:如果data为null,相当于任务已经完成了
		if (date != null) {
			// 拿到上一次预定执行的时间
			Date scheduled = triggerContext.lastScheduledExecutionTime();
			// 如果预定执行的时间为null(比如第一次)或者上一次还在data之后,那就取当前时间嘛
			if (scheduled != null && date.before(scheduled)) {
				// Previous task apparently executed too early...
				// Let's simply use the last calculated execution time then,
				// in order to prevent accidental re-fires in the same second.
				date = scheduled;
			}
		}
		// 如果任务还没有完成,那就以当前时间去计算下一个时间
		else {
			date = new Date();
		}
		return this.sequenceGenerator.next(date);
	}
PeriodicTrigger

用于定期执行的Trigger;它有两种模式:

  • fixedRate:两次任务开始时间之间间隔指定时长
  • fixedDelay: 上一次任务的结束时间与下一次任务开始时间``间隔指定时长

可见这两种情况的区别就在于,在决定下一次的执行计划时是否要考虑上次任务在什么时间执行完成。 默认情况下PeriodicTrigger使用了fixedDelay模式。

  • period: long类型,表示间隔时长,注意在fixedRate与fixedDelay两种模式下的不同含义
  • timeUnit: TimeUnit类型,表示间隔时长的单位,如毫秒等;默认是毫秒
  • initialDelay: long类型,表示启动任务后间隔多长时间开始执行第一次任务
  • fixedRate: boolean类型,表示是否是fixedRate,为True时是fixedRate,否则是fixedDelay,默认为False

TaskScheduler

Spring任务调度器的核心接口,定义了执行定时任务的主要方法,主要根据任务的不同触发方式调用不同的执行逻辑,其实现类都是对JDK原生的定时器或线程池组件进行包装,并扩展额外的功能

TaskScheduler用于对Runnable的任务进行调度,它包含有多种触发规则。

代码语言:javascript复制
public interface TaskScheduler {

	// 提交任务调度请求 
	// Runnable task:待执行得任务
	// Trigger trigger:使用Trigger指定任务调度规则
	@Nullable
	ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

	// @since 5.0  这里使用的Instant 类,其实最终也是转换成了Date
	default ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
		return schedule(task, Date.from(startTime));
	}
	//  提交任务调度请求   startTime表示它的执行时间
	//  注意任务只执行一次,使用startTime指定其启动时间  
	ScheduledFuture<?> schedule(Runnable task, Date startTime);

	// @since 5.0
	default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
		return scheduleAtFixedRate(task, Date.from(startTime), period.toMillis());
	}
	// 使用fixedRate的方式提交任务调度请求    任务首次启动时间由传入参数指定 
	// task 待执行的任务  startTime 任务启动时间    period 两次任务启动时间之间的间隔时间,默认单位是毫秒
	ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);

	// @since 5.0
	default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {
		return scheduleAtFixedRate(task, period.toMillis());
	}
	// 使用fixedRate的方式提交任务调度请求 任务首次启动时间未设置,任务池将会尽可能早的启动任务
	// task 待执行任务 
	// period 两次任务启动时间之间的间隔时间,默认单位是毫秒
	ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);

	// @since 5.0
	default ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {
		return scheduleWithFixedDelay(task, Date.from(startTime), delay.toMillis());
	}
	//  使用fixedDelay的方式提交任务调度请求  任务首次启动时间由传入参数指定 
	// delay 上一次任务结束时间与下一次任务开始时间的间隔时间,单位默认是毫秒 
	ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
	// @since 5.0
	default ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {
		return scheduleWithFixedDelay(task, delay.toMillis());
	}
	// 使用fixedDelay的方式提交任务调度请求 任务首次启动时间未设置,任务池将会尽可能早的启动任务 
	// delay 上一次任务结束时间与下一次任务开始时间的间隔时间,单位默认是毫秒 
	ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);

}

它有如下实现类:

备注:TaskScheduler的另一实现类TimerManagerTaskScheduler在Spring5.0之后就被直接移除了,因此本处不再讲述

ThreadPoolTaskScheduler

包装Java Concurrent中的ScheduledThreadPoolExecutor类,大多数场景下都使用它来进行任务调度

除实现了TaskScheduler接口中的方法外,它还包含了一些对ScheduledThreadPoolExecutor进行操作的接口

代码语言:javascript复制
public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
		implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {
	...
	// 默认的size 是1
	private volatile int poolSize = 1;
	private volatile boolean removeOnCancelPolicy = false;
	@Nullable
	private volatile ErrorHandler errorHandler;
	// 内部持有一个JUC的ScheduledExecutorService 的引用
	@Nullable
	private ScheduledExecutorService scheduledExecutor;
	...
	
	// 初始化线程池的执行器~~~~ 该方法的父类是ExecutorConfigurationSupport
	// 它定义了一些线程池的默认配置~~~~~
	@Override
	protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
		// 我们发现,如果set PoolSize,那么它的size就是1
		this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);

		if (this.removeOnCancelPolicy) {
			if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
				((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true);
			} else {
				logger.info("Could not apply remove-on-cancel policy - not a Java 7  ScheduledThreadPoolExecutor");
			}
		}

		return this.scheduledExecutor;
	}
	// 就是new一个ScheduledThreadPoolExecutor  来作为最终执行任务的执行器
	protected ScheduledExecutorService createExecutor(
			int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

		return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
	}
	...
	
	//获取当前活动的线程数 委托给ScheduledThreadPoolExecutor来做得
	public int getActiveCount() {
		if (this.scheduledExecutor == null) {
			// Not initialized yet: assume no active threads.
			return 0;
		}
		return getScheduledThreadPoolExecutor().getActiveCount();
	}

	// 显然最终就是交给ScheduledThreadPoolExecutor去执行了~~~
	// 提交执行一次的任务
	// submitsubmitListenable方法表示:提交执行一次的任务,并且返回一个Future对象供判断任务状态使用
	@Override
	public void execute(Runnable task) {
		Executor executor = getScheduledExecutor();
		try {
			executor.execute(errorHandlingTask(task, false));
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor ["   executor   "] did not accept task: "   task, ex);
		}
	}
	...

}

使用它前必须得先调用initialize()【初始化方法】,有shutDown()方法,执行完后可以关闭线程

Demo:

代码语言:javascript复制
    public static void main(String[] args) {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(2);
        taskScheduler.initialize(); // 务必调用此方法来启动

        // 执行任务
        // 执行一次
        taskScheduler.execute(() -> System.out.println(Thread.currentThread().getName()   "  我只会被执行一次~~~"));
        // 周期性执行
        taskScheduler.schedule(() -> System.out.println(Thread.currentThread().getName()   " 我会被多次执行~~~"), new CronTrigger("0/2 * * * * ?"));

        // 此处:若你有周期性的任务,这里不要shutdown()
        //taskScheduler.shutdown();
    }

输出:

代码语言:javascript复制
ThreadPoolTaskScheduler-1  我只会被执行一次~~~
ThreadPoolTaskScheduler-1 我会被多次执行~~~
ThreadPoolTaskScheduler-2 我会被多次执行~~~
ThreadPoolTaskScheduler-2 我会被多次执行~~~
ThreadPoolTaskScheduler-2 我会被多次执行~~~
ThreadPoolTaskScheduler-1 我会被多次执行~~~
ThreadPoolTaskScheduler-2 我会被多次执行~~~
ThreadPoolTaskScheduler-2 我会被多次执行~~~

发现每次都可能被不同的线程去执行(当然我们这里只放了两个~~~)

ConcurrentTaskScheduler

以单个线程方式执行定时任务,适用于简单场景;(以当前线程执行任务。如果任务简单,可以直接使用这个类来执行。快捷方便。)

PS:这是单线程运行

Demo:

代码语言:javascript复制
    public static void main(String[] args) {
        ConcurrentTaskScheduler taskScheduler = new ConcurrentTaskScheduler();

        // 执行任务
        // 执行一次
        taskScheduler.execute(() -> System.out.println(Thread.currentThread().getName()   "  我只会被执行一次~~~"));
        // 周期性执行
        taskScheduler.schedule(() -> System.out.println(Thread.currentThread().getName()   " 我会被多次执行~~~"), new CronTrigger("0/2 * * * * ?"));

        // 此处:若你有周期性的任务,这里不要shutdown()
        //taskScheduler.shutdown();
    }

输出:

代码语言:javascript复制
pool-2-thread-1  我只会被执行一次~~~
pool-3-thread-1 我会被多次执行~~~
pool-3-thread-1 我会被多次执行~~~
pool-3-thread-1 我会被多次执行~~~
pool-3-thread-1 我会被多次执行~~~
pool-3-thread-1 我会被多次执行~~~

执行的线程都是一样的。都是Executors.newSingleThreadScheduledExecutor()这个执行的~~~

DefaultManagedTaskScheduler

它继承自ConcurrentTaskScheduler,在ConcurrentTaskScheduler基础上增加了JNDI的支持。它@since 4.0,这里不多说明

ScheduledTask

定时任务类,内部包装了一个Runnable。

代码语言:javascript复制
// @since 4.3  发现这个类出现得还是比较晚得
public final class ScheduledTask {
	// 任务,其实就是很简单的包装了 Runnable。
	// 常见的子类有 TriggerTask、CronTask(主要是支持的CronTrigger、cron表达式)、
	// FixedDelayTask、FixedRateTask、IntervalTask(前两者得父类)
	private final Task task;
	@Nullable
	volatile ScheduledFuture<?> future;

	ScheduledTask(Task task) {
		this.task = task;
	}
	 //@since 5.0.2
	public Task getTask() {
		return this.task;
	}
	// 取消任务
	public void cancel() {
		ScheduledFuture<?> future = this.future;
		if (future != null) {
			future.cancel(true);
		}
	}
	@Override
	public String toString() {
		return this.task.toString();
	}
}
总结

这篇文章主要是对Spring得调取系统进行一些重点接口、类进行盘点。其实底层大都是依赖JDK的实现的。

从很多方面可以看出,Spring对JDK底层可谓非常非常的熟悉,才能运用得这么自如。在后面讲解SpringBoot、SpringCloud时依然能体现到这一点。那就是Spring很重重复造轮子,除非它真心觉得你的实现不好~

0 人点赞