Elastic-Job系列二之调度全流程

2024-07-09 15:16:48 浏览数 (2)

1 ElasticJobExecutor

elastic-job真正任务的执行时通过ElasticJobExecutor来执行,在新建JobScheduler实例时新建该实例,其内部构造函数如下

其中elasticJob属性为用户执行业务逻辑的实例,其他属性的作用在后续的分析中会一一提到

代码语言:javascript复制
private ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final JobItemExecutor jobItemExecutor) {
    this.elasticJob = elasticJob;
    this.jobFacade = jobFacade;
    this.jobItemExecutor = jobItemExecutor;
    executorContext = new ExecutorContext(jobFacade.loadJobConfiguration(true));
    itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
}

接下来看看调度执行的入口,ElasticJobExecutor的无参execute方法,先整体看看执行的步骤

代码语言:javascript复制
public void execute() {
    JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
    // 1 查看是否需要重新加载任务处理线程池和任务出错处理方式
    executorContext.reloadIfNecessary(jobConfig);
    JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
    try {
        // 2 校验Job执行的环境条件
        jobFacade.checkJobExecutionEnvironment();
    } catch (final JobExecutionEnvironmentException cause) {
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
    // 3 获取分片信息
    ShardingContexts shardingContexts = jobFacade.getShardingContexts();
    // 4 发布任务staging信息
    jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
    // 5 如果当前需要执行的分片正在running,那么设置所有的分片misfire,然后直接返回
    if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(),
                shardingContexts.getShardingItemParameters().keySet()));
        return;
    }
    try {
        // 6 执行job之前的自定义Listener的逻辑
        jobFacade.beforeJobExecuted(shardingContexts);
        // CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        // CHECKSTYLE:ON
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
    // 7 执行job
    execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
    // 8 如果需要执行misfire的任务,则在此处触发执行
    while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
        execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
    }
    // 9 任务执行完成之后看是否需要failover(查看/namespace/jobName/leader/failover/items下的子节点
    // 是否为空,如果不为空,当前节点参与failover的leader选举,成功则将执行失败的分片需要执行的任务
    // 在当前节点重新执行)
    jobFacade.failoverIfNecessary();
    try {
        // 10 job执行之后的自定义Listener的逻辑
        jobFacade.afterJobExecuted(shardingContexts);
        // CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        // CHECKSTYLE:ON
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
}

执行步骤还是比较清晰的,下面依次看看各个步骤具体干了什么

第一步:executorContext.reloadIfNecessary(jobConfig),查看是否需要重新加载context中可以需要重新加载的项

代码语言:javascript复制
public void reloadIfNecessary(final JobConfiguration jobConfiguration) {
    reloadableItems.values().forEach(each -> each.reloadIfNecessary(jobConfiguration));
}

reloadableItems中的类需要实现Reloadable接口,elastic-job利用SPI机制加载实现了Reloadable接口的类

代码语言:javascript复制
public ExecutorContext(final JobConfiguration jobConfig) {
    ServiceLoader.load(Reloadable.class).forEach(each -> {
        ElasticJobServiceLoader.newTypedServiceInstance(Reloadable.class, each.getType(), new Properties())
                .ifPresent(reloadable -> reloadableItems.put(reloadable.getType(), reloadable));
    });
    initReloadable(jobConfig);
}

有两个类实现了Reloadable接口

ExecutorServiceReloadable:获取任务执行的线程池,自带两种线程数设置方式,单线程和根据CPU的核数设置,默认是根据CPU核数设置,所以其reloadIfNecessary方法主要就是根据config中配置的线程池获取方式加载对应的线程池

代码语言:javascript复制
public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
    String newJobExecutorServiceHandlerType = Strings.isNullOrEmpty(jobConfig.getJobExecutorServiceHandlerType())
            ? JobExecutorServiceHandlerFactory.DEFAULT_HANDLER
            : jobConfig.getJobExecutorServiceHandlerType();
    if (newJobExecutorServiceHandlerType.equals(jobExecutorServiceHandlerType)) {
        return;
    }
    log.debug("JobExecutorServiceHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobExecutorServiceHandlerType, newJobExecutorServiceHandlerType);
    reload(newJobExecutorServiceHandlerType, jobConfig.getJobName());
}

JobErrorHandlerReloadable:获取任务出错处理类,elastic-job支持的错误处理方式有6中,钉钉通知、邮箱通知、忽略、记录日志,微信通知、直接排除异常,reloadIfNecessary方法如下

代码语言:javascript复制
public synchronized void reloadIfNecessary(final JobConfiguration jobConfig) {
    String newJobErrorHandlerType = Strings.isNullOrEmpty(jobConfig.getJobErrorHandlerType()) ? JobErrorHandlerFactory.DEFAULT_HANDLER : jobConfig.getJobErrorHandlerType();
    if (newJobErrorHandlerType.equals(jobErrorHandlerType) && props.equals(jobConfig.getProps())) {
        return;
    }
    log.debug("JobErrorHandler reload occurred in the job '{}'. Change from '{}' to '{}'.", jobConfig.getJobName(), jobErrorHandlerType, newJobErrorHandlerType);
    reload(newJobErrorHandlerType, jobConfig.getProps());
}

第二步:jobFacade.checkJobExecutionEnvironment(),主要就是判断下当前server和注册中心服务器的时间差有没有超过限制

代码语言:javascript复制
public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException {
    configService.checkMaxTimeDiffSecondsTolerable();
}
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
        int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
        if (0 > maxTimeDiffSeconds) {
            return;
        }
        long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
        if (timeDiff > maxTimeDiffSeconds * 1000L) {
            throw new JobExecutionEnvironmentException(
                    "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
        }
    }

第三步:jobFacade.getShardingContexts(),获取分片信息

代码语言:javascript复制
public ShardingContexts getShardingContexts() {
    boolean isFailover = configService.load(true).isFailover();
    if (isFailover) {
        List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
        // 如果开启了失效转移且转移到当前节点的item不为空,那么直接调用
        // executionContextService.getJobShardingContext方法获取当前节点需要处理的item
        if (!failoverShardingItems.isEmpty()) {
            return executionContextService.getJobShardingContext(failoverShardingItems);
        }
    }
    // 如果没有开启失效转移或者转移到当前节点的item为空,在执行之前先reshard(如果需要)
    shardingService.shardingIfNecessary();
    // 从Zookeeper中获取当前节点需要执行的item信息
    List<Integer> shardingItems = shardingService.getLocalShardingItems();
    if (isFailover) {
        // 如果开启了失效转移,那么需要移除已经转移到其他节点的item
        // 比如当前实例本来需要执行0和1,但是0已经转移到了其他的节点,则需要把0删除掉
        shardingItems.removeAll(failoverService.getLocalTakeOffItems());
    }
    shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
    return executionContextService.getJobShardingContext(shardingItems);
}

重点看看reshard的流程

代码语言:javascript复制
public void shardingIfNecessary() {
    // 获取可用的instance
    List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
    // 如果不需要sharding(判断/namespace/jobName/leader/sharding/necessary节点是否存在)
    // 或者无可用实例,直接返回
    if (!isNeedSharding() || availableJobInstances.isEmpty()) {
        return;
    }
    // 判断当前节点是否是leader,只有leader才能reshard
    // 这里如果有leader,直接判断即可,如果没有,则当前节点需要参与选举,选举完成之后再判断
    if (!leaderService.isLeaderUntilBlock()) {
        // 如果不是leader节点则需要等到sharding完成之后直接返回
        blockUntilShardingCompleted();
        return;
    }
    // sharding之前等待当前正在运行的item结束
    waitingOtherShardingItemCompleted();
    JobConfiguration jobConfig = configService.load(false);
    int shardingTotalCount = jobConfig.getShardingTotalCount();
    log.debug("Job '{}' sharding begin.", jobName);
    // 标记正在sahrding
    jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
    // 重置shardingInfo,设置好最新的分片信息,但是不设置分片的处理实例
    resetShardingInfo(shardingTotalCount);
    // 获取job的sharding策略,elastic默认支持三种策略,平均分片、奇偶分片、轮询分片,默认平均分片
    JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
    // 设置分片实例
    jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
    log.debug("Job '{}' sharding complete.", jobName);
}

第四步:jobFacade.postJobStatusTraceEvent,通过jobTracingEventBus发布了一条信息

代码语言:javascript复制
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
    TaskContext taskContext = TaskContext.from(taskId);
    jobTracingEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
            taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType().name(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
    if (!Strings.isNullOrEmpty(message)) {
        log.trace(message);
    }
}

第五步:jobFacade.misfireIfRunning,这个方法比较简单,上文代码注释中仪解释其作用,此处省略

第六步:jobFacade.beforeJobExecuted,这个也没有什么说的,就是获取自定义Listener列表,依次执行即可

第七步:execute执行job,包括第八步misifre的执行,每次执行完成之后,如果之前存在未执行的任务且当前不需要重新shard且开启了misfire的执行,则需要重新执行

代码语言:javascript复制
private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
    // item为空,直接返回
    if (shardingContexts.getShardingItemParameters().isEmpty()) {
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
        return;
    }
    // 如果开启了monitorExecution,为item设置running节点(/namespace/jobName/sharding/{item}/running)
    // 节点的类型根据是否开启failover有关
    // 如果开启了failover,则创建的是持久化节点(当实例宕机时,running节点不能丢,因为有Listener要监听
    // 实例宕机事件,从而设置failOver的item)
    // 如果failover没开,则创建的是临时节点,实例宕机,running信息自动删除
    jobFacade.registerJobBegin(shardingContexts);
    String taskId = shardingContexts.getTaskId();
    // 发布task running信息
    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
    try {
        // 执行任务,具体逻辑看下面的代码
        process(jobConfig, shardingContexts, executionSource);
    } finally {
        // 执行完成之后删除running信息和failover信息(如果有)
        jobFacade.registerJobCompleted(shardingContexts);
        // 根据是否有错误信息,发布不同的信息
        if (itemErrorMessages.isEmpty()) {
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
        } else {
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
            itemErrorMessages.clear();
        }
    }
}
代码语言:javascript复制
// 如果是一个item,直接执行即可,如果是多个item,则需要将所有任务执行完成之后再返回
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
    Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
    if (1 == items.size()) {
        int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
        JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
        // 这里的process方法就是调用JobItemExecutor的process方法处理
        // 比如如果是SimpleJob,则调用用户定义的Job实例的Executor方法
        // 如果是其他类型的Job,都有对应的JobItemExecutor处理
        process(jobConfig, shardingContexts, item, jobExecutionEvent);
        return;
    }
    CountDownLatch latch = new CountDownLatch(items.size());
    for (int each : items) {
        JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
        ExecutorService executorService = executorContext.get(ExecutorService.class);
        if (executorService.isShutdown()) {
            return;
        }
        executorService.submit(() -> {
            try {
                process(jobConfig, shardingContexts, each, jobExecutionEvent);
            } finally {
                latch.countDown();
            }
        });
    }
    try {
        latch.await();
    } catch (final InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}

第九步:jobFacade.failoverIfNecessary(),任务执行完成之后看是否需要failover,

查看/namespace/jobName/leader/failover/items下的子节点是否为空,如果不为空,当前节点参与failover的leader选举,成功则将执行失败的分片需要执行的任务在当前节点重新执行,调用failoverService.failoverIfNecessary方法

代码语言:javascript复制
public void failoverIfNecessary() {
    if (needFailover()) {
        jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
    }
}

看下选举成功之后的回调函数FailoverLeaderExecutionCallback

代码语言:javascript复制
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
    
    @Override
    public void execute() {
        if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
            return;
        }
        // 获取需要failover的子节点,/namespace/jobName/leader/failover/items下的子节点
        // 这里为什么只取一个任务?因为肯定会有多个分片失效的场景,只取一个任务防止当前实例压力过大?
        int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
        log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
        // 设置crashedItem 的failover信息(即crashedItem分片的任务转移到当前实例上来)
        jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        jobNodeStorage.fillJobNode(FailoverNode.getExecutingFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
        // TODO Instead of using triggerJob, use executor for unified scheduling
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        if (null != jobScheduleController) {
            // 触发任务
            jobScheduleController.triggerJob();
        }
    }
}

第十步:jobFacade.afterJobExecuted(shardingContexts),执行任务执行完成之后的自定义Listener逻辑

至此elastic-job的整个执行流程结束

0 人点赞