1 JobTriggerPoolHelper
任务调度触发的入口在JobTriggerPoolHelper.trigger方法,调用了helper.addTrigger方法
代码语言:javascript复制private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
......
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
看下helper的addTrigger方法
代码语言:javascript复制public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
}
});
}
第一步选择一个线程池执行任务,如果任务在最近一分钟内调度超时的次数大于10次,就选择slowTriggerPool,否则选择fastTriggerPool,第二步提交调度任务,由XxlJobTrigger.trigger方法执行具体的逻辑,执行完成之后统计超时的次数
看看XxlJobTrigger.trigger方法的逻辑
代码语言:javascript复制public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
// load data
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// cover addressList
if (addressList!=null && addressList.trim().length()>0) {
group.setAddressType(1);
group.setAddressList(addressList.trim());
}
// sharding param
int[] shardingParam = null;
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
for (int i = 0; i < group.getRegistryList().size(); i ) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else {
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
该方法逻辑比较简单,如果需要分片执行,则依次通过processTrigger方法执行,否则就封装一个单分片信息,还是调用processTrigger,所以下面看看processTrigger方法
代码语言:javascript复制private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、init trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// 3、init address
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
// 5、collection trigger info
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
if (shardingParam != null) {
triggerMsgSb.append("(" shardingParam ")");
}
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
triggerMsgSb.append("<br><br><span style="color:#00c0ef;" > >>>>>>>>>>>" I18nUtil.getString("jobconf_trigger_run") "<<<<<<<<<<< </span><br>")
.append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg() "<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
// 6、save log trigger-info
jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorShardingParam(shardingParam);
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
第一步生成一条调度日志保存在xxl_job_log表中
第二步封装触发的参数
第三步获取执行器的地址,如果是分片,那么根据index到执行器列表中选择对应的执行器,如果index无效,则选择第一个执行器,如果不是分片,那么根据配置的路由策略选择一个执行器,xxl-job支持的路由策略如下
代码语言:javascript复制public enum ExecutorRouteStrategyEnum {
// 选择第一个执行器
FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
// 选最后一个执行器
LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
// 轮询,每个job维护了一个计数器,每次加1再对执行器列表取余
ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
// 随机选择一个
RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
// 一致性Hash
CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
// 最近使用次数最少的优先,内部对每个Job对应的执行器维护一个计数器,选择计数器最小值对应执行器
LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
// 最近未使用时间最长的执行器优先,内部采用LinkedHashMap实现
LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
// 依次对执行器进行进行健康检查,返回第一个健康的执行器
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
// 依次查看执行器是否空闲,根据执行器对应的Job的执行线程是否正在处理任务或者执行线程的任务队列中
// 是否有任务来判断执行器是否空闲
BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
// 分片时使用,无须路由
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
}
第四步调用runExecutor方法执行
代码语言:javascript复制public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
代码语言:javascript复制public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl "run", accessToken, timeout, triggerParam, String.class);
}
可以看出调度中心通过Http请求的方式向执行器发送了一个执行的请求
第五步记录调度的具体信息,并更新调度日志对应的字段信息,以上调度中心的逻辑就完成了。
之前的文章中提到执行器内部会启动一个NettyHttpServer用来处理调中心的请求,那么直接看netty对应的业务处理EmbedHttpServerHandler.channelRead方法即可
代码语言:javascript复制protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
......
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
// write response
writeResponse(ctx, keepAlive, responseJson);
}
});
}
通过bizThreadPool执行具体的业务逻辑,看下process方法
代码语言:javascript复制private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
......
// services mapping
try {
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" uri ") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" ThrowableUtil.toString(e));
}
}
最终调用executorBiz对应的方法执行,这里看看执行业务逻辑的run方法,主要看看GlueType为BEAN模式的处理逻辑,其他类型类似
代码语言:javascript复制public ReturnT<String> run(TriggerParam triggerParam) {
// 根据JobId获取对应的任务处理线程和handler,调度中心一个Job对应的执行器一个@XxlJob注解配置的方法
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// 根据传入的名称获取对应的Handler,名称为@XxlJob注解中配置的value属性
// 执行器注册时会在内部维护一个Map,Key为@XxlJob注解配置的value属性,value为对应的method
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
if (jobThread!=null && jobHandler != newJobHandler) {
// 如果任务线程不为空,且最新的handler和任务线程中的hander不一致,说明handler发生了变化
// 那么将老的任务线程和老的handler置为空
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
if (jobHandler == null) {
// 如果jobHandler 为空,则设置为最新的handler
jobHandler = newJobHandler;
// 如果JobHandler获取失败,直接返回错误
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" triggerParam.getExecutorHandler() "] not found.");
}
}
} else if (......) {
......
} else {
......
}
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// 根据任务处理策略为discard,那么当任务线程正在处理或者任务队列不为空时,直接丢弃任务,返回失败
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:" ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// 根据任务处理策略为cover,那么当任务线程正在处理或者任务队列不为空时,丢弃现有的所有任务
// 将jobTread置为空
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// jobThread为空,则注册一个新的任务线程到内部的一个Map结构中,并且将老的任务线程停掉
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// 将此次任务添加到任务线程的任务队列中,添加任务时需要根据JobLogId判重,避免重复调度
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
再来看看任务线程的处理逻辑
代码语言:javascript复制public void run() {
// 在@XxlJob中如果配置了初始化方法,在任务线程启动时就会先执行init方法
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// execute
while(!toStop){
// 设置任务线程未处理状态,闲置次数 1
running = false;
idleTimes ;
TriggerParam triggerParam = null;
try {
// 从对列中获取任务,3s超时
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
// 拿到任务之后,设置任务线程运行中,清除闲置次数,并且从triggerLogIdSet中删除LogId
// triggerLogIdSet的作用是为了防止重复调度
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
// 生成本次调度任务执行的日志文件名称,后续根据LogId就能获取到对应的日志
// 调度中心查看日志使用
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobContext xxlJobContext = new XxlJobContext(
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal());
// 设置上下文
XxlJobContext.setXxlJobContext(xxlJobContext);
// execute
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" xxlJobContext.getJobParam());
if (triggerParam.getExecutorTimeout() > 0) {
// 如果设置了超时时间,则新启动一个线程执行任务并且同步获取结果
Thread futureThread = null;
try {
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
// 因为是新启的线程,所以这里要重新设置一次上下文
XxlJobContext.setXxlJobContext(xxlJobContext);
handler.execute();
return true;
}
});
futureThread = new Thread(futureTask);
futureThread.start();
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
XxlJobHelper.log(e);
// handle result
XxlJobHelper.handleTimeout("job execute timeout ");
} finally {
futureThread.interrupt();
}
} else {
// 如果没有设置超时时间,直接在当前线程执行即可
handler.execute();
}
// 处理任务执行结果信息,并且将其设置到上下文中
if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
XxlJobHelper.handleFail("job handle result lost.");
} else {
String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
?tempHandleMsg.substring(0, 50000).concat("...")
:tempHandleMsg;
XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
}
XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
XxlJobContext.getXxlJobContext().getHandleCode()
", handleMsg = "
XxlJobContext.getXxlJobContext().getHandleMsg()
);
} else {
// 如果闲置次数超过30次,且任务队列中没有任务,删除任务线程,防止占用系统资源
if (idleTimes > 30) {
if(triggerQueue.size() == 0) {
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
}
} catch (Throwable e) {
// 处理任务执行异常,记录异常日志,并且将错误信息设置到上下文中
if (toStop) {
XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" stopReason);
}
// handle result
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
XxlJobHelper.handleFail(errorMsg);
XxlJobHelper.log("<br>----------- JobThread Exception:" errorMsg "<br>----------- xxl-job job execute end(error) -----------");
} finally {
if(triggerParam != null) {
// 将任务执行结果添加到回调队列中,所有任务共用一个队列
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg() )
);
} else {
// is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason " [job running, killed]" )
);
}
}
}
}
// 能执行到这一步,说明任务线程被停了,那么需要将没有执行的任务也通过回调的方式通知调度中心
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason " [job not executed, in the job queue, killed.]")
);
}
}
// 最终调用handler的destroy方法,该方法同样也是在@XxlJob注解中配置
try {
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
最后由TriggerCallbackThread不断的从回调任务队列中获取任务,调用调度中心的接口回传结果,最终由调度中心的JobCompleteHelper中的callbackThreadPool处理回调结果,更新xxl_job_log表对应的字段信息
代码语言:javascript复制public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
callbackThreadPool.execute(new Runnable() {
@Override
public void run() {
for (HandleCallbackParam handleCallbackParam: callbackParamList) {
ReturnT<String> callbackResult = callback(handleCallbackParam);
logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
(callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
}
}
});
return ReturnT.SUCCESS;
}
至此整个调度的流程结束