1 ElasticJobExecutor
elastic-job真正任务的执行时通过ElasticJobExecutor来执行,在新建JobScheduler实例时新建该实例,其内部构造函数如下
其中elasticJob属性为用户执行业务逻辑的实例,其他属性的作用在后续的分析中会一一提到
代码语言:javascript复制private ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final JobItemExecutor jobItemExecutor) {
this.elasticJob = elasticJob;
this.jobFacade = jobFacade;
this.jobItemExecutor = jobItemExecutor;
executorContext = new ExecutorContext(jobFacade.loadJobConfiguration(true));
itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
}
接下来看看调度执行的入口,ElasticJobExecutor的无参execute方法,先整体看看执行的步骤
代码语言:javascript复制public void execute() {
JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
// 1 查看是否需要重新加载任务处理线程池和任务出错处理方式
executorContext.reloadIfNecessary(jobConfig);
JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
try {
// 2 校验Job执行的环境条件
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
// 3 获取分片信息
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
// 4 发布任务staging信息
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
// 5 如果当前需要执行的分片正在running,那么设置所有的分片misfire,然后直接返回
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(),
shardingContexts.getShardingItemParameters().keySet()));
return;
}
try {
// 6 执行job之前的自定义Listener的逻辑
jobFacade.beforeJobExecuted(shardingContexts);
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// CHECKSTYLE:ON
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
// 7 执行job
execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
// 8 如果需要执行misfire的任务,则在此处触发执行
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
}
// 9 任务执行完成之后看是否需要failover(查看/namespace/jobName/leader/failover/items下的子节点
// 是否为空,如果不为空,当前节点参与failover的leader选举,成功则将执行失败的分片需要执行的任务
// 在当前节点重新执行)
jobFacade.failoverIfNecessary();
try {
// 10 job执行之后的自定义Listener的逻辑
jobFacade.afterJobExecuted(shardingContexts);
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// CHECKSTYLE:ON
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
}
执行步骤还是比较清晰的,下面依次看看各个步骤具体干了什么
第一步:executorContext.reloadIfNecessary(jobConfig),查看是否需要重新加载context中可以需要重新加载的项
代码语言:javascript复制public void reloadIfNecessary(final JobConfiguration jobConfiguration) {
reloadableItems.values().forEach(each -> each.reloadIfNecessary(jobConfiguration));
}
reloadableItems中的类需要实现Reloadable接口,elastic-job利用SPI机制加载实现了Reloadable接口的类
代码语言:javascript复制public ExecutorContext(final JobConfiguration jobConfig) {
ServiceLoader.load(Reloadable.class).forEach(each -> {
ElasticJobServiceLoader.newTypedServiceInstance(Reloadable.class, each.getType(), new Properties())
.ifPresent(reloadable -> reloadableItems.put(reloadable.getType(), reloadable));
});
initReloadable(jobConfig);
}
有两个类实现了Reloadable接口
ExecutorServiceReloadable:获取任务执行的线程池,自带两种线程数设置方式,单线程和根据CPU的核数设置,默认是根据CPU核数设置,所以其reloadIfNecessary方法主要就是根据config中配置的线程池获取方式加载对应的线程池
代码语言:javascript复制public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
String newJobExecutorServiceHandlerType = Strings.isNullOrEmpty(jobConfig.getJobExecutorServiceHandlerType())
? JobExecutorServiceHandlerFactory.DEFAULT_HANDLER
: jobConfig.getJobExecutorServiceHandlerType();
if (newJobExecutorServiceHandlerType.equals(jobExecutorServiceHandlerType)) {
return;
}
log.debug("JobExecutorServiceHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobExecutorServiceHandlerType, newJobExecutorServiceHandlerType);
reload(newJobExecutorServiceHandlerType, jobConfig.getJobName());
}
JobErrorHandlerReloadable:获取任务出错处理类,elastic-job支持的错误处理方式有6中,钉钉通知、邮箱通知、忽略、记录日志,微信通知、直接排除异常,reloadIfNecessary方法如下
代码语言:javascript复制public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
String newJobErrorHandlerType = Strings.isNullOrEmpty(jobConfig.getJobErrorHandlerType()) ? JobErrorHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobErrorHandlerType();
if (newJobErrorHandlerType.equals(jobErrorHandlerType) && props.equals(jobConfig.getProps())) {
return;
}
log.debug("JobErrorHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobErrorHandlerType, newJobErrorHandlerType);
reload(newJobErrorHandlerType, jobConfig.getProps());
}
第二步:jobFacade.checkJobExecutionEnvironment(),主要就是判断下当前server和注册中心服务器的时间差有没有超过限制
代码语言:javascript复制public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException {
configService.checkMaxTimeDiffSecondsTolerable();
}
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
if (0 > maxTimeDiffSeconds) {
return;
}
long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
if (timeDiff > maxTimeDiffSeconds * 1000L) {
throw new JobExecutionEnvironmentException(
"Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
}
}
第三步:jobFacade.getShardingContexts(),获取分片信息
代码语言:javascript复制public ShardingContexts getShardingContexts() {
boolean isFailover = configService.load(true).isFailover();
if (isFailover) {
List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
// 如果开启了失效转移且转移到当前节点的item不为空,那么直接调用
// executionContextService.getJobShardingContext方法获取当前节点需要处理的item
if (!failoverShardingItems.isEmpty()) {
return executionContextService.getJobShardingContext(failoverShardingItems);
}
}
// 如果没有开启失效转移或者转移到当前节点的item为空,在执行之前先reshard(如果需要)
shardingService.shardingIfNecessary();
// 从Zookeeper中获取当前节点需要执行的item信息
List<Integer> shardingItems = shardingService.getLocalShardingItems();
if (isFailover) {
// 如果开启了失效转移,那么需要移除已经转移到其他节点的item
// 比如当前实例本来需要执行0和1,但是0已经转移到了其他的节点,则需要把0删除掉
shardingItems.removeAll(failoverService.getLocalTakeOffItems());
}
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
return executionContextService.getJobShardingContext(shardingItems);
}
重点看看reshard的流程
代码语言:javascript复制public void shardingIfNecessary() {
// 获取可用的instance
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
// 如果不需要sharding(判断/namespace/jobName/leader/sharding/necessary节点是否存在)
// 或者无可用实例,直接返回
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
// 判断当前节点是否是leader,只有leader才能reshard
// 这里如果有leader,直接判断即可,如果没有,则当前节点需要参与选举,选举完成之后再判断
if (!leaderService.isLeaderUntilBlock()) {
// 如果不是leader节点则需要等到sharding完成之后直接返回
blockUntilShardingCompleted();
return;
}
// sharding之前等待当前正在运行的item结束
waitingOtherShardingItemCompleted();
JobConfiguration jobConfig = configService.load(false);
int shardingTotalCount = jobConfig.getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
// 标记正在sahrding
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
// 重置shardingInfo,设置好最新的分片信息,但是不设置分片的处理实例
resetShardingInfo(shardingTotalCount);
// 获取job的sharding策略,elastic默认支持三种策略,平均分片、奇偶分片、轮询分片,默认平均分片
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
// 设置分片实例
jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}
第四步:jobFacade.postJobStatusTraceEvent,通过jobTracingEventBus发布了一条信息
代码语言:javascript复制public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
TaskContext taskContext = TaskContext.from(taskId);
jobTracingEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType().name(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
if (!Strings.isNullOrEmpty(message)) {
log.trace(message);
}
}
第五步:jobFacade.misfireIfRunning,这个方法比较简单,上文代码注释中仪解释其作用,此处省略
第六步:jobFacade.beforeJobExecuted,这个也没有什么说的,就是获取自定义Listener列表,依次执行即可
第七步:execute执行job,包括第八步misifre的执行,每次执行完成之后,如果之前存在未执行的任务且当前不需要重新shard且开启了misfire的执行,则需要重新执行
代码语言:javascript复制private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
// item为空,直接返回
if (shardingContexts.getShardingItemParameters().isEmpty()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
return;
}
// 如果开启了monitorExecution,为item设置running节点(/namespace/jobName/sharding/{item}/running)
// 节点的类型根据是否开启failover有关
// 如果开启了failover,则创建的是持久化节点(当实例宕机时,running节点不能丢,因为有Listener要监听
// 实例宕机事件,从而设置failOver的item)
// 如果failover没开,则创建的是临时节点,实例宕机,running信息自动删除
jobFacade.registerJobBegin(shardingContexts);
String taskId = shardingContexts.getTaskId();
// 发布task running信息
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
try {
// 执行任务,具体逻辑看下面的代码
process(jobConfig, shardingContexts, executionSource);
} finally {
// 执行完成之后删除running信息和failover信息(如果有)
jobFacade.registerJobCompleted(shardingContexts);
// 根据是否有错误信息,发布不同的信息
if (itemErrorMessages.isEmpty()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
} else {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
itemErrorMessages.clear();
}
}
}
代码语言:javascript复制// 如果是一个item,直接执行即可,如果是多个item,则需要将所有任务执行完成之后再返回
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
// 这里的process方法就是调用JobItemExecutor的process方法处理
// 比如如果是SimpleJob,则调用用户定义的Job实例的Executor方法
// 如果是其他类型的Job,都有对应的JobItemExecutor处理
process(jobConfig, shardingContexts, item, jobExecutionEvent);
return;
}
CountDownLatch latch = new CountDownLatch(items.size());
for (int each : items) {
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
ExecutorService executorService = executorContext.get(ExecutorService.class);
if (executorService.isShutdown()) {
return;
}
executorService.submit(() -> {
try {
process(jobConfig, shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
第九步:jobFacade.failoverIfNecessary(),任务执行完成之后看是否需要failover,
查看/namespace/jobName/leader/failover/items下的子节点是否为空,如果不为空,当前节点参与failover的leader选举,成功则将执行失败的分片需要执行的任务在当前节点重新执行,调用failoverService.failoverIfNecessary方法
代码语言:javascript复制public void failoverIfNecessary() {
if (needFailover()) {
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
看下选举成功之后的回调函数FailoverLeaderExecutionCallback
代码语言:javascript复制class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
return;
}
// 获取需要failover的子节点,/namespace/jobName/leader/failover/items下的子节点
// 这里为什么只取一个任务?因为肯定会有多个分片失效的场景,只取一个任务防止当前实例压力过大?
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
// 设置crashedItem 的failover信息(即crashedItem分片的任务转移到当前实例上来)
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
jobNodeStorage.fillJobNode(FailoverNode.getExecutingFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO Instead of using triggerJob, use executor for unified scheduling
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
// 触发任务
jobScheduleController.triggerJob();
}
}
}
第十步:jobFacade.afterJobExecuted(shardingContexts),执行任务执行完成之后的自定义Listener逻辑
至此elastic-job的整个执行流程结束