开篇语
上一篇文章介绍了elastic-job-lite的入门,架构。使用和一些流程,里面提到elastic-job-lite是一个去中心化,轻量级的任务调度框架,那为什么elastic-jib-lite在启动时要选取主节点呢?难道我看错了,哈哈,不可能的,后文 elastic-job-lite简称ejl。
leader选举
ejl定位为轻量级,去中心化,其任务调度由各自的机器驱动,各台机器之间通过zk去协调,ejl为每个任务都创建一个JobScheduler,而在JobScheduler的初始化中回为每个job选举一个主节点,记住不是全局一个主节点,而是每个任务一个主节。如下图,每个节点上都运行两个任务job1,job2,那么在启动时每个节点就会创建两个JobScheduler对象,为每一个任务在集群中选举一个leader。这个leader是怎么选举出来的呢?什么时候开始选举?一、在整个集群启动时为每个任务选举leader; 二、当有些任务的leader下线时,会重新选举。
- 集群启动时选举leader
在JobScheduler中
public void init(){
schedulerFacder.registerStartUpInfo();
}
public void registerStartUpInfo(){
leaderService.electLeader();
}
/** * 选举主节点. */
public void electLeader() {
log.debug("Elect a new leader now.");
jobNodeStorage.executeInLeader(LeaderNode.LATCH,
new LeaderElectionExecutionCallBack());
log.debug("Leader election completed.");
}
public void executeInLeader(final String latchNode,
final LeaderExecutionCallback callback) {
//通过LeaderLatch进行选举
//这是curator(zk的客户端)中类
LeaderLatch latch =
new LeaderLatch(getClient()jobNodePath.getFullPath(latchNode));
//开始选举
latch.start();
//阻塞,直到选举成功
latch.await();
//在回调方法中写入主节点标记
callback.execute();
}
代码语言:javascript复制在callback.execute()中执行如下,再次判断没有主节点,将当前机器示例id写入
if (!hasLeader()) {
jobNodeStorage.fillEphemeralJobNode(
LeaderNode.INSTANCE,instanceId));
}
- leader重新选举,主要是通过
LeaderElectionJobListener
这个监听器来实现leader重新选举,当一个job还在运行,但leader节点下线了,就要重新选举leader
class LeaderElectionJobListener extends AbstractJobListener{
protected void dataChanged(){
//具体选举看上面的代码
leaderService.electLeader()
}
}
主节点的选举的本质就是大伙竞争一个zk的分布式锁。谁先得到锁,谁就是主节点。
何时使用leader?有什么作用?
分布式系统中,在一个任务执行过程中,有多个机器,多个分片,那么如何去分配呢?哪些机器执行哪些分片呢,如果大家都参与岂不是乱了,这个时候就需要一个领导者来拍板。在ejl中有两处需要leader节点来参与:
- 机器启动后,任务开始第一次执行时,需要leader来分片
- 当集群中有新的节点增加时,分片的数量有变化时或者有一些节点下线时都会触发重新分片 主要代码如下,大家阅读源码时可从
AbstractElasticJobExecute
类中execute
方法开始看起。
AbstractElasticJobExecute类中
public final void execute(){
//获取分片,这个方法中主节点leader会分片
ShardingContexts shardingContexts =
jobFacade.getShardingContexts();
}
在 getShardingContexts()中,有如下方法
shardingService.shardingIfNecessary();
/**
如果需要分片且当前节点为主节点, 则作业分片.
如果当前无可用节点则不分片.
*/
public void shardingIfNecessary() {
//不是主节点直接返回,不允许分片
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
....
}
leader节点删除的时机
leader节点删除的时机有三处,一,在leader节点所在机器进程CRASHED时,jvm通过钩子方法删除自己;二,作业被禁用时删除leader节点,三,主节点进程远程关闭
- leader机器进程关闭
JobShutdownHookPlugin类中
public void shutdown() {
if (leaderService.isLeader()) {
leaderService.removeLeader();
}
}
- 作业被禁用时
LeaderAbdicationJobListener类中
protected void dataChanged(
//判断是leader,并且作业被禁用
if (leaderService.isLeader()
&& isLocalServerDisabled(path, data)) {
leaderService.removeLeader();
}
}
- 作业终止调度时
InstanceShutdownStatusJobListener类中
protected void dataChanged(
//当job未暂停,
//并且调度控制器未暂停,
//并且事件是移除这个实例,
//并且运行实例未被移除
if (!JobRegistry.getInstance().isShutdown(jobName)
&&
!JobRegistry.getInstance()
.getJobScheduleController(jobName).isPaused()
&&
isRemoveInstance(path, eventType)
&&
!isReconnectedRegistryCenter()) {
//在这个方法中removeLeader
schedulerFacade.shutdownInstance();
}
}
>
EJL的leader在zk中的数据结构
代码在
LeaderNode
类中
- leader在zk中的根路径
String ROOT = "leader";
- 这是leader进行选举的父路径 /leader/election
String ELECTION_ROOT = ROOT "/election";
- 保存主节点的地址 /leader/election/instance 这是一个临时节点,leader所在的机器下线后,这个路径就会消失,对于重新选举有作用
String INSTANCE = ELECTION_ROOT "/instance";
- leader选举的分布式锁 /leader/election/latch
String LATCH = ELECTION_ROOT "/latch";