本篇主要结合源码分析Spring定时任务框架。
如前文所述,可通过@EnableScheduling注解开启定时任务调度,所以我们从@EnableScheduling注解开始:
代码语言:javascript复制@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
可以看到,@EnableScheduling主要功能就是通过@Import导入了SchedulingConfiguration配置类,@Import是Spring框架提供的导入注解,可以显式地导入ImportSelecto、ImportBeanDefinitionRegistrar、Configuration配置类或者其他组件,具体使用可以参考https://blog.csdn.net/u010502101/article/details/78760032,这里不再赘述。现在我们来看SchedulingConfiguration配置类:
代码语言:javascript复制@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
//向IOC容器中注册一个ScheduledAnnotationBeanPostProcessor实例
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
SchedulingConfiguration类就是Spring定时任务的配置类,它的主要功能就是向容器中注入了一个ScheduledAnnotationBeanPostProcessor后处理器实例。ScheduledAnnotationBeanPostProcessor是Spring后处理器的一个典型应用场景(Spring的BeanPostProcessor机制可参考https://blog.csdn.net/elim168/article/details/76146351),定时任务的解析和注册都由该后处理器完成。下面我们重点分析ScheduledAnnotationBeanPostProcessor:
ScheduledAnnotationBeanPostProcessor有两个重要的成员变量:
代码语言:javascript复制//定时任务的注册中心,维护所有定时任务实例
private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();
//ScheduledAnnotationBeanPostProcessor单独维护的定时任务Map
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
ScheduledAnnotationBeanPostProcessor注册定时任务的处理在后处理方法postProcessAfterInitialization()中:
代码语言:javascript复制@Override
public Object postProcessAfterInitialization(final Object bean, String beanName) {
//扫描bean上标记了@Scheduled注解的方法
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass)) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " bean.getClass());
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() " @Scheduled methods processed on bean '" beanName
"': " annotatedMethods);
}
}
}
return bean;
}
该方法的作用就是扫描bean上标记了@Scheduled注解的方法,然后执行processScheduled()
重点来看processScheduled方法:
代码语言:javascript复制protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
Assert.isTrue(method.getParameterCount() == 0,
"Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value "" initialDelayString "" - cannot parse into long");
}
}
}
// Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}
// Check fixed delay
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value "" fixedDelayString "" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// Check fixed rate
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value "" fixedRateString "" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> registeredTasks = this.scheduledTasks.get(bean);
if (registeredTasks == null) {
registeredTasks = new LinkedHashSet<>(4);
this.scheduledTasks.put(bean, registeredTasks);
}
registeredTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" method.getName() "': " ex.getMessage());
}
}
processScheduled方法的参数是@Scheduled注解对象,标记了@Scheduled的方法实体和方法所在的bean,它主要完成了三项工作:
- @Scheduled注解的属性解析
- 定时任务的执行
- 定时任务的注册
我们来仔细看下具体的处理:
首先是对方法的一些校验,要求方法必须无参,且@Scheduled注解的属性必须配置了cron、fixedDelay和fixedRate其中的一个
代码语言:javascript复制//方法校验
Assert.isTrue(method.getParameterCount() == 0,
"Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
@Scheduled可配置三种类型的定时任务,分别对应了cron、fixedDelay和fixedRate属性,下面的代码是对三种类型的属性进行解析并且触发定时任务
代码语言:javascript复制// Check cron expression
//crontab型定时任务
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}
// Check fixed delay
//fixed delay型定时任务
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value "" fixedDelayString "" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// Check fixed rate
//fixed rate型定时任务
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value "" fixedRateString "" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
以常用的fixed delay型定时任务为例:首先对fixedDelay属性进行解析和校验,然后将其封装成一个FixedDelayTask任务交给registrar注册中心去注册和执行,下面看下ScheduledTaskRegistrar的scheduleFixedDelayTask方法:
代码语言:javascript复制@Nullable
public ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) {
//判断是否是新注册的任务,并将任务保存起来
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
//将任务交给内部代理的线程池去执行
if (this.taskScheduler != null) {
if (task.getInitialDelay() > 0) {
Date startTime = new Date(System.currentTimeMillis() task.getInitialDelay());
scheduledTask.future =
this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval());
}
else {
scheduledTask.future =
this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), task.getInterval());
}
}
else {
addFixedDelayTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
该方法主要就是判断下是否是新注册的任务,并将任务保存起来,然后将定时任务交给内部代理的线程池去执行,最后如果是新注册的任务就将其返回
ScheduledTaskRegistrar使用了不同的集合保存不同类型的任务
代码语言:javascript复制@Nullable
private List<CronTask> cronTasks;
@Nullable
private List<IntervalTask> fixedRateTasks;
@Nullable
private List<IntervalTask> fixedDelayTasks;
其他类型的定时任务逻辑类似,这里不再重复叙述。
最后将所有的ScheduledTask注册到scheduledTasks中:
代码语言:javascript复制// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> registeredTasks = this.scheduledTasks.get(bean);
if (registeredTasks == null) {
registeredTasks = new LinkedHashSet<>(4);
this.scheduledTasks.put(bean, registeredTasks);
}
registeredTasks.addAll(tasks);
}
scheduledTasks是一个Map,保存了所有的ScheduledTask实例,key是标记有@Scheduled注解的bean,value是该bean下所有的ScheduledTask集合。注册时使用了synchronized保证了并发安全
代码语言:javascript复制private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
总结
Spring定时任务框架的核心就是使用后处理器扫描所有@Scheduled注解并注册定时任务,然后通过代理的JDK 线程池执行任务调度,其逻辑并不复杂,但是很好地实现了定时任务的处理,仅通过注解就可以配置各种类型的定时任务,使用起来十分方便。