Spring 定时任务框架详解(3)——源码分析

2020-09-03 10:32:04 浏览数 (1)

本篇主要结合源码分析Spring定时任务框架。

如前文所述,可通过@EnableScheduling注解开启定时任务调度,所以我们从@EnableScheduling注解开始:

代码语言:javascript复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

可以看到,@EnableScheduling主要功能就是通过@Import导入了SchedulingConfiguration配置类,@Import是Spring框架提供的导入注解,可以显式地导入ImportSelecto、ImportBeanDefinitionRegistrar、Configuration配置类或者其他组件,具体使用可以参考https://blog.csdn.net/u010502101/article/details/78760032,这里不再赘述。现在我们来看SchedulingConfiguration配置类:

代码语言:javascript复制
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
    //向IOC容器中注册一个ScheduledAnnotationBeanPostProcessor实例
    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new ScheduledAnnotationBeanPostProcessor();
    }

}

SchedulingConfiguration类就是Spring定时任务的配置类,它的主要功能就是向容器中注入了一个ScheduledAnnotationBeanPostProcessor后处理器实例。ScheduledAnnotationBeanPostProcessor是Spring后处理器的一个典型应用场景(Spring的BeanPostProcessor机制可参考https://blog.csdn.net/elim168/article/details/76146351),定时任务的解析和注册都由该后处理器完成。下面我们重点分析ScheduledAnnotationBeanPostProcessor:

ScheduledAnnotationBeanPostProcessor有两个重要的成员变量:

代码语言:javascript复制
//定时任务的注册中心,维护所有定时任务实例
private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();

//ScheduledAnnotationBeanPostProcessor单独维护的定时任务Map
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);

ScheduledAnnotationBeanPostProcessor注册定时任务的处理在后处理方法postProcessAfterInitialization()中:

代码语言:javascript复制
@Override
public Object postProcessAfterInitialization(final Object bean, String beanName) {
    //扫描bean上标记了@Scheduled注解的方法
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass)) {
            Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                        Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                method, Scheduled.class, Schedules.class);
                        return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                    });
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @Scheduled annotations found on bean class: "   bean.getClass());
                }
            }
            else {
                // Non-empty set of methods
                annotatedMethods.forEach((method, scheduledMethods) ->
                        scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
                if (logger.isDebugEnabled()) {
                    logger.debug(annotatedMethods.size()   " @Scheduled methods processed on bean '"   beanName  
                            "': "   annotatedMethods);
                }
            }
        }
        return bean;
    }

该方法的作用就是扫描bean上标记了@Scheduled注解的方法,然后执行processScheduled()

重点来看processScheduled方法:

代码语言:javascript复制
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
        try {
            Assert.isTrue(method.getParameterCount() == 0,
                    "Only no-arg methods may be annotated with @Scheduled");

            Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
            Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
            boolean processedSchedule = false;
            String errorMessage =
                    "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

            Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

            // Determine initial delay
            long initialDelay = scheduled.initialDelay();
            String initialDelayString = scheduled.initialDelayString();
            if (StringUtils.hasText(initialDelayString)) {
                Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
                if (this.embeddedValueResolver != null) {
                    initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
                }
                if (StringUtils.hasLength(initialDelayString)) {
                    try {
                        initialDelay = parseDelayAsLong(initialDelayString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid initialDelayString value ""   initialDelayString   "" - cannot parse into long");
                    }
                }
            }

            // Check cron expression
            String cron = scheduled.cron();
            if (StringUtils.hasText(cron)) {
                String zone = scheduled.zone();
                if (this.embeddedValueResolver != null) {
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                    zone = this.embeddedValueResolver.resolveStringValue(zone);
                }
                if (StringUtils.hasLength(cron)) {
                    Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                    processedSchedule = true;
                    TimeZone timeZone;
                    if (StringUtils.hasText(zone)) {
                        timeZone = StringUtils.parseTimeZoneString(zone);
                    }
                    else {
                        timeZone = TimeZone.getDefault();
                    }
                    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                }
            }

            // At this point we don't need to differentiate between initial delay set or not anymore
            if (initialDelay < 0) {
                initialDelay = 0;
            }

            // Check fixed delay
            long fixedDelay = scheduled.fixedDelay();
            if (fixedDelay >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }
            String fixedDelayString = scheduled.fixedDelayString();
            if (StringUtils.hasText(fixedDelayString)) {
                if (this.embeddedValueResolver != null) {
                    fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                }
                if (StringUtils.hasLength(fixedDelayString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedDelay = parseDelayAsLong(fixedDelayString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedDelayString value ""   fixedDelayString   "" - cannot parse into long");
                    }
                    tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
                }
            }

            // Check fixed rate
            long fixedRate = scheduled.fixedRate();
            if (fixedRate >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
            }
            String fixedRateString = scheduled.fixedRateString();
            if (StringUtils.hasText(fixedRateString)) {
                if (this.embeddedValueResolver != null) {
                    fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                }
                if (StringUtils.hasLength(fixedRateString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedRate = parseDelayAsLong(fixedRateString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedRateString value ""   fixedRateString   "" - cannot parse into long");
                    }
                    tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
                }
            }

            // Check whether we had any attribute set
            Assert.isTrue(processedSchedule, errorMessage);

            // Finally register the scheduled tasks
            synchronized (this.scheduledTasks) {
                Set<ScheduledTask> registeredTasks = this.scheduledTasks.get(bean);
                if (registeredTasks == null) {
                    registeredTasks = new LinkedHashSet<>(4);
                    this.scheduledTasks.put(bean, registeredTasks);
                }
                registeredTasks.addAll(tasks);
            }
        }
        catch (IllegalArgumentException ex) {
            throw new IllegalStateException(
                    "Encountered invalid @Scheduled method '"   method.getName()   "': "   ex.getMessage());
        }
    }

processScheduled方法的参数是@Scheduled注解对象,标记了@Scheduled的方法实体和方法所在的bean,它主要完成了三项工作:

  1. @Scheduled注解的属性解析
  2. 定时任务的执行
  3. 定时任务的注册

我们来仔细看下具体的处理:

首先是对方法的一些校验,要求方法必须无参,且@Scheduled注解的属性必须配置了cron、fixedDelay和fixedRate其中的一个

代码语言:javascript复制
//方法校验
Assert.isTrue(method.getParameterCount() == 0,
                    "Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
boolean processedSchedule = false;
String errorMessage =
    "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

@Scheduled可配置三种类型的定时任务,分别对应了cron、fixedDelay和fixedRate属性,下面的代码是对三种类型的属性进行解析并且触发定时任务

代码语言:javascript复制
// Check cron expression
//crontab型定时任务
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}

// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}

// Check fixed delay
//fixed delay型定时任务
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value ""   fixedDelayString   "" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}

// Check fixed rate
//fixed rate型定时任务
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value ""   fixedRateString   "" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}

以常用的fixed delay型定时任务为例:首先对fixedDelay属性进行解析和校验,然后将其封装成一个FixedDelayTask任务交给registrar注册中心去注册和执行,下面看下ScheduledTaskRegistrar的scheduleFixedDelayTask方法:

代码语言:javascript复制
@Nullable
public ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) {
//判断是否是新注册的任务,并将任务保存起来
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}

//将任务交给内部代理的线程池去执行
if (this.taskScheduler != null) {
if (task.getInitialDelay() > 0) {
Date startTime = new Date(System.currentTimeMillis()   task.getInitialDelay());
scheduledTask.future =
this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval());
}
else {
scheduledTask.future =
this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), task.getInterval());
}
}
else {
addFixedDelayTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}

该方法主要就是判断下是否是新注册的任务,并将任务保存起来,然后将定时任务交给内部代理的线程池去执行,最后如果是新注册的任务就将其返回

ScheduledTaskRegistrar使用了不同的集合保存不同类型的任务

代码语言:javascript复制
@Nullable
private List<CronTask> cronTasks;

@Nullable
private List<IntervalTask> fixedRateTasks;

@Nullable
private List<IntervalTask> fixedDelayTasks;

其他类型的定时任务逻辑类似,这里不再重复叙述。

最后将所有的ScheduledTask注册到scheduledTasks中:

代码语言:javascript复制
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> registeredTasks = this.scheduledTasks.get(bean);
if (registeredTasks == null) {
registeredTasks = new LinkedHashSet<>(4);
this.scheduledTasks.put(bean, registeredTasks);
}
registeredTasks.addAll(tasks);
}

scheduledTasks是一个Map,保存了所有的ScheduledTask实例,key是标记有@Scheduled注解的bean,value是该bean下所有的ScheduledTask集合。注册时使用了synchronized保证了并发安全

代码语言:javascript复制
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);

总结

Spring定时任务框架的核心就是使用后处理器扫描所有@Scheduled注解并注册定时任务,然后通过代理的JDK 线程池执行任务调度,其逻辑并不复杂,但是很好地实现了定时任务的处理,仅通过注解就可以配置各种类型的定时任务,使用起来十分方便。

0 人点赞