一、前言
在前面我们曾经说过,如果将常见的任务调度中间件分为 「中心化」 和 「去中心化」 两个流派的话,那么 xxl-job
可以说是中心化的典型代表。而 xxl-job
中心化最重要的一个组成部分就是我们下面要介绍的 「调度中心」 。
在最新版的 xxl-job
架构图中,我们可以看到调度中心提供了诸如任务管理、执行器管理、日志管理、任务调度/路由、失败告警等等功能,具体可以参考下面的架构图:
可以看到调度中心提供的功能还是相当多的。不过马克主义哲学的矛盾分析法告诉我们,看问题要抓主要矛盾的主要方面。抛开日志管理、失败报警等运维相关的功能,调度中心最核心的功能应该是如下三个:
- 服务注册发现(包括执行器注册发现、任务注册发现)
- 任务调度/路由机制
- 失败重试机制
下面我们通过调度中心启动过程来一窥上面三个功能是如何运作的。
二、初探调度中心
前段时间火了一个名词 「“时间管理者”」 。仔细想想,管理着众多定时任务的调度中心何尝不也是一个优秀的时间管理者吗?下面让我们来欣赏一下这位时间管理者的时间管理秘籍:
1. 任务注册发现
在 JobRegistryMonitorHelper.getInstance().start();
方法中开启了一个任务注册发现监控器的保护线程。在这个保护线程中,调度中心会进行任务组的发现注册,这一过程主要是检查并去除已经离线的执行器实例(一个应用会有多个实例),重新刷新执行器组在线实例地址以及更新任务组的注册地址。核心代码逻辑如下:
// auto registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// 刷新执行器在线地址 (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// fresh group address
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
addressListStr = "";
for (String item:registryList) {
addressListStr = item ",";
}
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
这里调度中心是通过检查 xxl_job_registry
中是否存在90秒钟没有更新的任务注册信息(即检查90秒内执行器是否发送心跳),如果没有进行更新,则认为该执行器实例已下线。然后重新更新执行器在线实例地址以及任务组的注册地址(该注册地址用于调度中心进行任务触发使用)。
2. 任务失败丢失处理
在 JobFailMonitorHelper.getInstance().start();
和 JobLosedMonitorHelper.getInstance().start();
两个方法中主要是对 「执行失败」 的任务和 「丢失」 的任务(执行器实例在执行过程中被重启或者下线,导致任务执行线程被终止,最终任务丢失)进行相应的处理。
首先,来看下 JobFailMonitorHelper
。其实,xxl-job
对于执行失败的任务并没有类似 dubbo
提供的多种failover机制,对于失败的任务其实就是根据用户在调度中心设置的任务失败重试次数进行简单的失败重试,如果重试次数达到设置的重试次数则放弃重试。
而对于由于执行器示例在执行任务过程中被重启或者下线导致的任务丢失长时间没有将任务结果回调调度中心的情况,在 JobLosedMonitorHelper
做出了如下处理:
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(jobLog);
}
}
调度中心会进行调度任务是否停留 「“运行状态”」 超过10分钟以及对应执行器实例是否已经离线的判断。如果符合判断条件,则会主动将任务置为失败,然后按照任务执行失败处理。
3. 任务调度
作为任务调度中间件最重要的一环,调度中心在 JobScheduleHelper.getInstance().start();
方法中进行了任务的调度逻辑处理。
在 JobScheduleHelper.getInstance().start();
方法中开启了一个任务调度的守护线程,在这个线程中每五秒会执行一次任务调度。针对调度中心使用HA模式时,xxl-job
使用了争抢数据库锁的方式来让每个调度中心实例争抢任务调度权(这是为了避免重复触发任务)。具体代码如下:
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
在进行任务调度时,调度中心采取的是预处理方式。即预先读取指定数量的任务进行调度。这里预先读取的时间范围为 「任务下次执行时间下小于当前时间加5秒」 ,数量为 「线程池大小*触发器qps(每个触发器花费50ms,qps = 1000/50 = 20)」 。但是实际计算预读取数量的时候使用的是快触发器线程大小 慢触发器线程大小。具体源码如下:
代码语言:javascript复制// 预读取任务数量
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
// 预读取任务调度列表,这里PRE_READ_MS=5000ms
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime PRE_READ_MS, preReadCount);
对于任务调度列表中下次执行时间调度中心又作出了如下处理:
当前时间 > (下次触发时间 5s)
: 直接丢弃任务调度。当前时间 > 下次触发时间
: 直接触发任务,并重新计算任务下次触发时间,如果在预读取范围内,则放置到ringData
内。当前时间 <= 下次触发时间
: 将任务插入到ringData
等待任务执行。(该逻辑为主要任务调度逻辑分支,以上两种为特殊情况)。
三、总结
上面就是对 xxl-job
调度中心的三个核心功能的简单分析(细心的同学一定发现 JobTriggerPoolHelper
还没有讲,这部分由于较为重要,闲鱼准备单开一章分析 )。除了这三个较为核心的功能,xxl-job
的调度中心还提供了诸如任务失败报警、日志管理、报表大盘等运维功能,这些优秀的运维功能也是闲鱼为什么想要使用 xxl-job
的重要原因,后续闲鱼会挑选其中比较重要的功能进行介绍。
以上纯属个人浅见,如有谬误,请各位看官大佬多多指正。