大家好,本文给大家简单介绍一下Elastic-Job 失效转移原理
Elastic-Job 作业高可用的失效转移功能实现原理动画
文 | 宋小生
8 失效转移
8.1 简介
失效转移是作业补偿的另外一个场景,作业如果在执行过程中执行节点崩溃了那本次作业将无法正常执行完成,导致作业执行异常,这个时候就需要我们执行失效转移将崩溃的作业分片转移到其他可以正常执行的机器上面进行作业的补偿执行,失效转移的过程一共分为如下几步:
(1). 发现崩溃作业分片。
(2). 记录崩溃作业分片。
(3). 当前可用机器抢占崩溃作业分片。
(4). 抢占到崩溃分片的实例重新执行崩溃分片。
接下来我们就通过视频来看下整体来看下这几步:(视频可能播放过快可以多看几遍来参考理解)
http://mpvideo.qpic.cn/0bf2biaaaaaah4afi4txarqvacwdaafaaaaa.f10002.mp4?dis_k=caaecf88729f82113d2fa35247cef7fa&dis_t=1671014521&vid=wxv_2107520703603277828&format_id=10002&support_redirect=0&mmversion=false
视频8.1 失效转移动画
8.2 崩溃作业分片
先说哪些作业分片可以称为崩溃的作业分片的,作业在执行之前主节点使用分片算法进行分片,分片后的各作业实例根据分配到的分片项来执行作业,作业分片项有两种状态:未执行或者运行中,对于未执行的分片项所对应的作业实例如果发生宕机时对业务系统不会造成影响,只需要在下次执行作业之前重新进行分片就可以,对于运行中的分片项如果所对应的实例发生了宕机则就有可能导致执行到一半的作业异常结束,这样的话这一次执行当前崩溃的分片就无法完整执行,对于业务系统来说如果作业执行频率比较低又需要在作业执行之后使用作业跑批的数据由于作业实例宕机造成的分片无法完整执行就容易产生事故。针对这种运行中的分片如果发生了实例宕机导致的无法完整执行的分片我们可以称为崩溃分片。
8.3 发现记录崩溃作业分片
那调度作业是如何发现崩溃分片的呢?
这里是通过监听注册在Zookeeper上的实例信息如果发现进程实例宕机则执行一次失效转移,如下:在作业初始化的时候会将当前作业实例信息写入Zookeeper上路径为{作业名字}/instances/{当前实例IP 进程id} ,当发现进程所对应的临时节点被移除则触发失效转移,将当前崩溃的进程所对应的分片转移到其他实例上,然后在可用实例上重新触发一次失效分片的作业执行,不过在ElasticJob中这里仅仅是监听了进程崩溃并没有判断崩溃的进程所对应分片的状态是否为运行中,这样就会导致当进程崩溃的时候非运行中的分片被触发一次。其实也可以监听运行中的分片来重新补偿执行被中断运行的分片,这样的话不过需要优化下代码,可以监听sharding/{分片序号}/running节点。
如果要优化sharding/{分片序号}/running节点先来了解下原理,可以看下前面作业在执行模版,在作业执行之前,如果是幂等作业则会在分片节点下写入一个running的临时节点来标记当前作业分片正在执行,如下所示:
代码语言:javascript复制 public void registerJobBegin(final ShardingContexts shardingContexts) {
JobRegistry.getInstance().setJobRunning(jobName, true);
//判断分片幂等配置是否开启
if (!configService.load(true).isMonitorExecution()) {
return;
}
//写入sharding/{分片序号}/running节点
for (int each : shardingContexts.getShardingItemParameters().keySet()) {
jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
}
}}
可以看到在作业运行之前则会有状态机记录,这里在当前作业的每个分片节点下都写入了running临时节点,完整路径为sharding/{分片序号}/running节点,使用Zookeeper的临时节点,临时节点会记录当前节点所归属的客户端信息并且当所对应客户端session失效或者一直未发生心跳Zookeeper则会删除临时节点,通过临时节点可以有效的发现宕机实例。
代码语言:javascript复制 class JobCrashedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() 1);
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
for (int each : failoverItems) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
} else {
for (int each : shardingService.getShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
}
}
崩溃作业实例监听器做了如下判断:
- 失效转移配置开启。
- instances下的作业实例信息节点被移除。
- 崩溃的实例id不能为当前实例id(不能让崩溃实例转移到崩溃实例)。
失效的节点有两种:
- 需要转移到某台可用的实例上。
- 已经转移到某台可用的实例上,但是这台可用的实例也崩溃了则两个实例所对应的所有分片都需要转移到其他可用的实例上。
先来看第一种情况:实例节点崩溃需要转移到某台可用的实例上
这个对应上面代码中的else如下代码:
代码语言:javascript复制 for (int each : shardingService.getShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
整个过程分为3大步:
- 读取崩溃分片:调用getShardingItems方法获取当前崩溃实例被分配的所有分片项,然后遍历这些分片项。
- 记录崩溃分片:调用setCrashedFailoverFlag方法,将失效分片项持久化到leader节点下对应路径如下:{jobName}/leader/failover/items/{崩溃分片项}。
- 触发失效转移逻辑:可用实例使用分布式锁抢占崩溃实例,然后触发一次崩溃分片的执行。
8.4 分布式锁抢占崩溃实例的分片然后重新执行崩溃分片
接下来我们就来详细看下触发失效转移逻辑:可用实例使用分布式锁抢占崩溃实例,然后触发一次崩溃分片的执行。
代码语言:javascript复制 public void failoverIfNecessary() {
if (needFailover()) {
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
先判断当前leader节点下存在崩溃节点则执行抢占逻辑,抢占的时候借助Zookeeper的分布式锁来触发失效转移逻辑,对应锁节点路径为:leader/failover/latch,这里使用Zookeeper分布式锁的方式可以参考前面选主的过程如下:
《Elastic-Job2.1.5源码-基于Zookeeper分布式锁实现选举作业主节点原理》
失效转移回调执行的具体逻辑如下:
代码语言:javascript复制class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
return;
}
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO 不应使用triggerJob, 而是使用executor统一调度
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
jobScheduleController.triggerJob();
}
}
}
可以看到失效转移执行的时候触发了这样的逻辑:
- 读取记录的崩溃分片:获取当前在leader节点下记录的未分片的崩溃分片项,路径为:{jobName}/leader/failover/items/{崩溃分片项}。
- 抢占崩溃分片:在崩溃分片项下写入可执行节点(后面会由这个可执行节点来执行这个崩溃的分片),路径为:sharding/%s/failover。
- 移除记录的崩溃分片:移除leader节点下的未分片的失效转移分片项,路径为:leader/failover/items/%s。
- 重新触发崩溃分片执行,崩溃分片补偿执行。
8.5 执行失效转移
在作业执行模版类型AbstractElasticJobExecutor中与执行失效转移相关的代码如下所示:
代码语言:javascript复制 public final void execute() {
...
//获取作业分片信息,这里如果存在失效转移分片则获取失效转移的分片项
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
...
//执行作业
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
...
//如果需要抢占主节点下的失效转移节点则执行分布式锁抢占逻辑
jobFacade.failoverIfNecessary();
...
作业执行之前会优先获取分片项,这里如果当前实例节点已经抢占过崩溃分片并将自己的实例信息写入到了崩溃分片节点中路径为:sharding/%s/failover下 那当前节点就可以对这个崩溃节点%s进行失效转移
代码语言:javascript复制 @Override
public ShardingContexts getShardingContexts() {
//优先判断是否需要获取失效转移的分片项
boolean isFailover = configService.load(true).isFailover();
if (isFailover) {
List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
if (!failoverShardingItems.isEmpty()) {
return executionContextService.getJobShardingContext(failoverShardingItems);
}
}
...
}
这里是获取当前实例节点抢占到的崩溃节点的逻辑,在这个方法getLocalFailoverItems中会根据当前实例信息从节点sharding/%s/failover的值中查询与当前实例匹配的即为当前节点抢占到的崩溃分片,最后本实例将本实例可以执行的崩溃分片信息封装在分片上下文中,然后在下面的execute方法中来执行这些崩溃分片
- END -