1 XxlJobScheduler
调度中心启动之后,会调用XxlJobScheduler.init方法进行初始化,代码如下
代码语言:javascript复制public class XxlJobScheduler {
......
public void init() throws Exception {
// 初始化语言,支持中文、中文繁体、英文
initI18n();
// 触发器线程池初始化,初始化了两个线程池,fastTriggerPool和slowTriggerPool
JobTriggerPoolHelper.toStart();
// 初始化执行器请求处理线程池以及启动执行器定时刷新线程
JobRegistryHelper.getInstance().start();
// 启动调度失败重试线程
JobFailMonitorHelper.getInstance().start();
// 初始化执行器回调请求处理线程池以及启动任务结果丢失处理线程
JobCompleteHelper.getInstance().start();
// 启动调度中心报表统计线程
JobLogReportHelper.getInstance().start();
// 启动任务调度线程
JobScheduleHelper.getInstance().start();
}
}
初始化语言的方法比较简单就不看了,下面依次看下下其他步骤的代码
JobTriggerPoolHelper.start方法
代码语言:javascript复制public void start(){
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" r.hashCode());
}
});
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" r.hashCode());
}
});
}
可以看到,就是初始化了两个线程池,fastTriggerPool和slowTriggerPool,两个线程池的区别在于最大线程数和任务队列的大小不同,使用上有什么区别呢?在JobTriggerPoolHelper中的addTrigger方法中在触发任务时有个选择线程池的逻辑,如下
代码语言:javascript复制public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
......
}
如果Job在最近一分钟之类超时超过10次(此处的超时不是指执行器执行业务逻辑的时间超时,而是调度中心完成任务调度的时间超时,默认500ms),说明该任务调度比较慢,所以将其调度到slowTriggerPool中执行,总的来说就是将调度较快和调度较慢的任务丢到不同的线程池中去执行,互相不影响。
JobRegistryHelper.start方法
代码语言:javascript复制public void start(){
// for registry or remove
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2,
10,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
}
});
// for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// auto registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// fresh group address
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
}
第一步初始化registryOrRemoveThreadPool,用来处理执行器的注册和删除请求
第二步启动了一个执行器监控线程,每隔30s将自动注册的执行器从xxl_job_group表中查出来,从xxl_job_registry表中删除过期的执行器(最后更新时间 90s < 当前时间),并且将有效的执行器的地址列表填入xxl_job_group的addressList字段供后续调度使用。
JobFailMonitorHelper.start
代码语言:javascript复制private Thread monitorThread;
private volatile boolean toStop = false;
public void start(){
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// monitor
while (!toStop) {
try {
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// lock log
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、fail retry monitor
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style="color:#F39C12;" > >>>>>>>>>>>" I18nUtil.getString("jobconf_trigger_type_retry") "<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info != null) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
monitorThread.start();
}
针对每一次调度会在xxl_job_log中保存一次调度记录,该方法的作用是
第一步启动一个线程每10秒扫描一次调度失败且告警状态为0的记录
第二步锁定告警状态
第三步如果重试次数大于0,那么执行重试的逻辑(重试又会增加一条记录,并且重试次数字段减1,重试时如果是分片执行,需要带上对应的分片信息)
第四步发送告警,更新记录的告警状态字段
JobCompleteHelper.start
代码语言:javascript复制public void start(){
// for callback
callbackThreadPool = new ThreadPoolExecutor(
2,
20,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
}
});
// for monitor
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// wait for JobTriggerPoolHelper-init
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
// monitor
while (!toStop) {
try {
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(60);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
monitorThread.start();
}
JobCompleteHelper初始化了一个回调请求处理线程池,紧接着启动了一个监控线程monitorThread,每隔60s扫描一次一直处于运行中且持续时间超过10分钟的调度记录,并手动设置为失败状态
JobLogReportHelper的逻辑比较简单,就是启动了一个线程每隔一分钟去统计下前三天的调度日志统计,比如运行中、失败、成功的数量等供前段页面展示,并且删除过期的调度日志记录
最后看下JobScheduleHelper.start方法,这个方法比较复杂,属于调度中心的核心逻辑,代码如下
代码语言:javascript复制public void start(){
// 1.启动一个调度线程
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
// 1.1 启动之后暂停5整秒执行
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()00 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
// 1.2 获取每次扫描的任务数量(fastPoolSize:200 slowPoolSize:100)* 20 = 6000
// 默认每次只能处理6000个任务,使用时要注意这个参数
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
// 1.3 获取调度锁,防止调度中心集群部署时多次调度
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1.4 从xxl_job_info表中获取在未来5s之内需要执行的任务
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
for (XxlJobInfo jobInfo: scheduleList) {
if (nowTime > jobInfo.getTriggerNextTime() PRE_READ_MS) {
// 如果任务的下一次执行时间 5s < 当前时间
// 说明上一次任务没有触发,然后再根据配置的调度过期策略决定是触发一次还是马上触发一次
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " jobInfo.getId() );
}
// 最后再更新Job的下一次执行的时间
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 如果任务的下一次执行时间 < 当前时间,直接触发
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " jobInfo.getId() );
// 更新Job的下一次执行的时间
refreshNextValidTime(jobInfo, new Date());
if (jobInfo.getTriggerStatus()==1 && nowTime PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 如果任务是运行中的状态,且下次执行的时间在当前时间 5s 之内
// 那么将任务放入时间轮,等待触发
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)`);
pushTimeRing(ringSecond, jobInfo.getId());
// 最后再更新Job的下一次执行的时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 如果任务的下一次执行时间 > 当前时间,将任务放入时间轮,等待触发
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)`);
pushTimeRing(ringSecond, jobInfo.getId());
// 更新Job的下一次执行的时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 最后更新任务信息
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// 提交事务
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// 如果以上的方法执行时间小于1s,那么等待1整秒后继续扫描任务
// 如果大于1s就不等待,所以xxl-job对于每秒执行的任务支持有点儿问题,如果以上的方法的
// 执行时间大于1s,那么每秒执行的任务将永远延迟
if (cost < 1000) {
try {
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()00);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// 2 启动时间轮线程
ringThread = new Thread(new Runnable() {
@Override
public void run() {
while (!ringThreadToStop) {
try {
// 每次等待到整秒执行
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 获取当前时间的秒数
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 避免处理耗时太长,跨过刻度,向前校验一个刻度
for (int i = 0; i < 2; i ) {
List<Integer> tmpData = ringData.remove( (nowSecond 60-i)` );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " nowSecond " = " Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// 触发任务
for (int jobId: ringItemData) {
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// 清理已经触发的任务
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
至此调度中心的初始化完成。