XXL-JOB系列四之调度全流程

2024-07-01 17:56:35 浏览数 (1)

1 JobTriggerPoolHelper

任务调度触发的入口在JobTriggerPoolHelper.trigger方法,调用了helper.addTrigger方法

代码语言:javascript复制
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
......
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
    helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);

看下helper的addTrigger方法

代码语言:javascript复制
public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {
    // choose thread pool
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }
    // trigger
    triggerPool_.execute(new Runnable() {
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            try {
                // do trigger
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                // check timeout-count-map
                long minTim_now = System.currentTimeMillis()/60000;
                if (minTim != minTim_now) {
                    minTim = minTim_now;
                    jobTimeoutCountMap.clear();
                }
                // incr timeout-count-map
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {       // ob-timeout threshold 500ms
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if (timeoutCount != null) {
                        timeoutCount.incrementAndGet();
                    }
                }
            }
        }
    });
}

第一步选择一个线程池执行任务,如果任务在最近一分钟内调度超时的次数大于10次,就选择slowTriggerPool,否则选择fastTriggerPool,第二步提交调度任务,由XxlJobTrigger.trigger方法执行具体的逻辑,执行完成之后统计超时的次数

看看XxlJobTrigger.trigger方法的逻辑

代码语言:javascript复制
public static void trigger(int jobId,
                           TriggerTypeEnum triggerType,
                           int failRetryCount,
                           String executorShardingParam,
                           String executorParam,
                           String addressList) {

    // load data
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

    // cover addressList
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

    // sharding param
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i  ) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else {
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }

}

该方法逻辑比较简单,如果需要分片执行,则依次通过processTrigger方法执行,否则就封装一个单分片信息,还是调用processTrigger,所以下面看看processTrigger方法

代码语言:javascript复制
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

    // param
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

    // 1、save log-id
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

    // 2、init trigger-param
    TriggerParam triggerParam = new TriggerParam();
    triggerParam.setJobId(jobInfo.getId());
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);

    // 3、init address
    String address = null;
    ReturnT<String> routeAddressResult = null;
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            if (index < group.getRegistryList().size()) {
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0);
            }
        } else {
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                address = routeAddressResult.getContent();
            }
        }
    } else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

    // 4、trigger remote executor
    ReturnT<String> triggerResult = null;
    if (address != null) {
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

    // 5、collection trigger info
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
            .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if (shardingParam != null) {
        triggerMsgSb.append("(" shardingParam ")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

    triggerMsgSb.append("<br><br><span style="color:#00c0ef;" > >>>>>>>>>>>"  I18nUtil.getString("jobconf_trigger_run")  "<<<<<<<<<<< </span><br>")
            .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg() "<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

    // 6、save log trigger-info
    jobLog.setExecutorAddress(address);
    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
    jobLog.setExecutorParam(jobInfo.getExecutorParam());
    jobLog.setExecutorShardingParam(shardingParam);
    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
    //jobLog.setTriggerTime();
    jobLog.setTriggerCode(triggerResult.getCode());
    jobLog.setTriggerMsg(triggerMsgSb.toString());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

    logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

第一步生成一条调度日志保存在xxl_job_log表中

第二步封装触发的参数

第三步获取执行器的地址,如果是分片,那么根据index到执行器列表中选择对应的执行器,如果index无效,则选择第一个执行器,如果不是分片,那么根据配置的路由策略选择一个执行器,xxl-job支持的路由策略如下

代码语言:javascript复制
public enum ExecutorRouteStrategyEnum {
    // 选择第一个执行器
    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
    // 选最后一个执行器
    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
    // 轮询,每个job维护了一个计数器,每次加1再对执行器列表取余
    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
    // 随机选择一个
    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
    // 一致性Hash
    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
    // 最近使用次数最少的优先,内部对每个Job对应的执行器维护一个计数器,选择计数器最小值对应执行器
    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
    // 最近未使用时间最长的执行器优先,内部采用LinkedHashMap实现
    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
    // 依次对执行器进行进行健康检查,返回第一个健康的执行器
    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
    // 依次查看执行器是否空闲,根据执行器对应的Job的执行线程是否正在处理任务或者执行线程的任务队列中
    // 是否有任务来判断执行器是否空闲
    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
    // 分片时使用,无须路由
    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
}

第四步调用runExecutor方法执行

代码语言:javascript复制
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run")   ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}
代码语言:javascript复制
public ReturnT<String> run(TriggerParam triggerParam) {
    return XxlJobRemotingUtil.postBody(addressUrl   "run", accessToken, timeout, triggerParam, String.class);
}

可以看出调度中心通过Http请求的方式向执行器发送了一个执行的请求

第五步记录调度的具体信息,并更新调度日志对应的字段信息,以上调度中心的逻辑就完成了。

之前的文章中提到执行器内部会启动一个NettyHttpServer用来处理调中心的请求,那么直接看netty对应的业务处理EmbedHttpServerHandler.channelRead方法即可

代码语言:javascript复制
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    ......
    // invoke
    bizThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            // do invoke
            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
            // to json
            String responseJson = GsonTool.toJson(responseObj);
            // write response
            writeResponse(ctx, keepAlive, responseJson);
        }
    });
}

通过bizThreadPool执行具体的业务逻辑,看下process方法

代码语言:javascript复制
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
    ......
    // services mapping
    try {
        switch (uri) {
            case "/beat":
                return executorBiz.beat();
            case "/idleBeat":
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            case "/run":
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            case "/kill":
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            case "/log":
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            default:
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("   uri   ") not found.");
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:"   ThrowableUtil.toString(e));
    }
}

最终调用executorBiz对应的方法执行,这里看看执行业务逻辑的run方法,主要看看GlueType为BEAN模式的处理逻辑,其他类型类似

代码语言:javascript复制
public ReturnT<String> run(TriggerParam triggerParam) {
    // 根据JobId获取对应的任务处理线程和handler,调度中心一个Job对应的执行器一个@XxlJob注解配置的方法
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // 根据传入的名称获取对应的Handler,名称为@XxlJob注解中配置的value属性
        // 执行器注册时会在内部维护一个Map,Key为@XxlJob注解配置的value属性,value为对应的method
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
        if (jobThread!=null && jobHandler != newJobHandler) {
            // 如果任务线程不为空,且最新的handler和任务线程中的hander不一致,说明handler发生了变化
            // 那么将老的任务线程和老的handler置为空
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
            jobThread = null;
            jobHandler = null;
        }
        if (jobHandler == null) {
            // 如果jobHandler 为空,则设置为最新的handler
            jobHandler = newJobHandler;
            // 如果JobHandler获取失败,直接返回错误
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler ["   triggerParam.getExecutorHandler()   "] not found.");
            }
        }

    } else if (......) {
        ......
    } else {
        ......
    }
    
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // 根据任务处理策略为discard,那么当任务线程正在处理或者任务队列不为空时,直接丢弃任务,返回失败
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:" ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // 根据任务处理策略为cover,那么当任务线程正在处理或者任务队列不为空时,丢弃现有的所有任务
            // 将jobTread置为空
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:"   ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }

    // jobThread为空,则注册一个新的任务线程到内部的一个Map结构中,并且将老的任务线程停掉
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // 将此次任务添加到任务线程的任务队列中,添加任务时需要根据JobLogId判重,避免重复调度
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

再来看看任务线程的处理逻辑

代码语言:javascript复制
public void run() {
        // 在@XxlJob中如果配置了初始化方法,在任务线程启动时就会先执行init方法
        try {
          handler.init();
       } catch (Throwable e) {
           logger.error(e.getMessage(), e);
       }
       // execute
       while(!toStop){
          // 设置任务线程未处理状态,闲置次数 1
          running = false;
          idleTimes  ;
            TriggerParam triggerParam = null;
            try {
             // 从对列中获取任务,3s超时
             triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
             if (triggerParam!=null) {
                // 拿到任务之后,设置任务线程运行中,清除闲置次数,并且从triggerLogIdSet中删除LogId
                // triggerLogIdSet的作用是为了防止重复调度
                running = true;
                idleTimes = 0;
                triggerLogIdSet.remove(triggerParam.getLogId());

                // 生成本次调度任务执行的日志文件名称,后续根据LogId就能获取到对应的日志
                // 调度中心查看日志使用
                String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                XxlJobContext xxlJobContext = new XxlJobContext(
                      triggerParam.getJobId(),
                      triggerParam.getExecutorParams(),
                      logFileName,
                      triggerParam.getBroadcastIndex(),
                      triggerParam.getBroadcastTotal());

                // 设置上下文
                XxlJobContext.setXxlJobContext(xxlJobContext);

                // execute
                XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:"   xxlJobContext.getJobParam());

                if (triggerParam.getExecutorTimeout() > 0) {
                   // 如果设置了超时时间,则新启动一个线程执行任务并且同步获取结果
                   Thread futureThread = null;
                   try {
                      FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                         @Override
                         public Boolean call() throws Exception {
                            // 因为是新启的线程,所以这里要重新设置一次上下文
                            XxlJobContext.setXxlJobContext(xxlJobContext);
                            handler.execute();
                            return true;
                         }
                      });
                      futureThread = new Thread(futureTask);
                      futureThread.start();

                      Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                   } catch (TimeoutException e) {

                      XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                      XxlJobHelper.log(e);

                      // handle result
                      XxlJobHelper.handleTimeout("job execute timeout ");
                   } finally {
                      futureThread.interrupt();
                   }
                } else {
                   // 如果没有设置超时时间,直接在当前线程执行即可
                   handler.execute();
                }

                // 处理任务执行结果信息,并且将其设置到上下文中
                if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                   XxlJobHelper.handleFail("job handle result lost.");
                } else {
                   String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                   tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                         ?tempHandleMsg.substring(0, 50000).concat("...")
                         :tempHandleMsg;
                   XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                }
                XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                        XxlJobContext.getXxlJobContext().getHandleCode()
                        ", handleMsg = "
                        XxlJobContext.getXxlJobContext().getHandleMsg()
                );

             } else {
                // 如果闲置次数超过30次,且任务队列中没有任务,删除任务线程,防止占用系统资源
                if (idleTimes > 30) {
                   if(triggerQueue.size() == 0) {
                      XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                   }
                }
             }
          } catch (Throwable e) {
             // 处理任务执行异常,记录异常日志,并且将错误信息设置到上下文中
             if (toStop) {
                XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:"   stopReason);
             }

             // handle result
             StringWriter stringWriter = new StringWriter();
             e.printStackTrace(new PrintWriter(stringWriter));
             String errorMsg = stringWriter.toString();

             XxlJobHelper.handleFail(errorMsg);

             XxlJobHelper.log("<br>----------- JobThread Exception:"   errorMsg   "<br>----------- xxl-job job execute end(error) -----------");
          } finally {
                if(triggerParam != null) {
                    // 将任务执行结果添加到回调队列中,所有任务共用一个队列
                    if (!toStop) {
                        // commonm
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                               triggerParam.getLogId(),
                         triggerParam.getLogDateTime(),
                         XxlJobContext.getXxlJobContext().getHandleCode(),
                         XxlJobContext.getXxlJobContext().getHandleMsg() )
                   );
                    } else {
                        // is killed
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                               triggerParam.getLogId(),
                         triggerParam.getLogDateTime(),
                         XxlJobContext.HANDLE_CODE_FAIL,
                         stopReason   " [job running, killed]" )
                   );
                    }
                }
            }
        }

       // 能执行到这一步,说明任务线程被停了,那么需要将没有执行的任务也通过回调的方式通知调度中心
       while(triggerQueue !=null && triggerQueue.size()>0){
          TriggerParam triggerParam = triggerQueue.poll();
          if (triggerParam!=null) {
             // is killed
             TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                   triggerParam.getLogId(),
                   triggerParam.getLogDateTime(),
                   XxlJobContext.HANDLE_CODE_FAIL,
                   stopReason   " [job not executed, in the job queue, killed.]")
             );
          }
       }

       // 最终调用handler的destroy方法,该方法同样也是在@XxlJob注解中配置
       try {
          handler.destroy();
       } catch (Throwable e) {
          logger.error(e.getMessage(), e);
       }
       logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());

最后由TriggerCallbackThread不断的从回调任务队列中获取任务,调用调度中心的接口回传结果,最终由调度中心的JobCompleteHelper中的callbackThreadPool处理回调结果,更新xxl_job_log表对应的字段信息

代码语言:javascript复制
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
		callbackThreadPool.execute(new Runnable() {
			@Override
			public void run() {
				for (HandleCallbackParam handleCallbackParam: callbackParamList) {
					ReturnT<String> callbackResult = callback(handleCallbackParam);
					logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
							(callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
				}
			}
		});
		return ReturnT.SUCCESS;
}

至此整个调度的流程结束

0 人点赞