elastic-job-lite使用的一点心得和坑

2019-08-06 10:11:05 浏览数 (1)

开篇词

elastic-job-lite在项目中使用也有两个多月的时间了,从一开始搜索网上教程,参考别人使用方法,到后面阅读源码,理解其架构,实现。也写了几篇关于ejl的架构,流程处理,数据结构的文章,中间也经历了很多坑,也有了一些最佳实践,这篇文章写一下总结,但不是ejl的结尾,后续还会有ejl的文章,下面附上前几篇的链接:

  1. elastic-job-lite入门以及架构原理分析
  2. elastic-job-lite 既然去中心化,为何要选举主节点
  3. elastic-job-lite 数据结构分析
使用心得

ejl中有三种job作业类型,simple, dataflow, script,这三种任务类型都支持cron表达式定时调用,也支持页面单次触发。

  • simple是简单类型,提供接口如下,入参为当前分片的上下文,接口如下,这种简单类型接口适合处理一些业务逻辑不是很复杂的情况,且在短时间内能完成,ejl中还好,是本地调用,如果夸服务调用,那么时间一长,是很容易超时的,调度中心会认为处理失败,总的来说适合一些高频,低时的任务,且没有相对复杂的数据分离处理。
代码语言:javascript复制
void execute(ShardingContext var1);
  • dataflow是一种流失数据处理,fetchData入参为当前分片的上下文,processData入参为需要处理的数据,接口如下,这种流式处理接口一般适合大数据量计算,fetchData负责需要处理的数据,processData负责对抓取的数据计算生成结果存储,也适合一些大量任务的执行,将数据的准备和数据的处理进行了分离,从而在代码组织上比较清晰,这种类型的作业一般也会耗时比较长少则几分钟,多则几小时,但一般低频,一天一次,其任务触发后在本地机器while(list.size>0)执行,直到fetchData返回的数据集合大小为0,此处一般建议processData中可以开起多线程处理数据,加快执行处理时间。如果你需要处理的数据是断断续续的生成,那么这种是不适合的,因为fetchData的时候,可能你的数据还在生成中,导致任务已经完毕,这是可以考虑第一种。
代码语言:javascript复制
List<T> fetchData(ShardingContext var1);
void processData(ShardingContext var1, List<T> var2);
  • scritp是一种脚本类型,其接口为一个接口,它适合一些脚本作业触发,也可以说是实现语言不是Java的一些脚本任务,比如shell写了一段业务逻辑,python写个一个汇总逻辑,需要每隔一小时执行一次,这是script就派上用场了,通过命令进行触发,在script的配置中有一个scriptCommandLine参数,这个参数就是配置执行这个脚本的命令,执行器会把当前上下文信息转化为json参数拼在命令行后面,ejl会定时通过命令触发你写的脚本,script脚本执行器如下:
代码语言:javascript复制
public final class ScriptJobExecutor extends AbstractElasticJobExecutor{

protected void process(final ShardingContext shardingContext) {
    //scriptCommandLine从上下文中获取到,声明为final
    final String scriptCommandLine = shardingContext.get...
    this.executeScript(shardingContext, scriptCommandLine);
}

private void executeScript(final ShardingContext shardingContext, final String scriptCommandLine){

    CommandLine commandLine = CommandLine.parse(scriptCommandLine);
    commandLine.addArgument("上下文json串" false);
    try {
            //org.apache.commons.exec apache的包
            new DefaultExecutor().execute(commandLine);
    } catch (final IOException ex) {
            throw new JobConfigurationException("Execute script failure.", ex);
    }}
}

上面介绍了三种作业类型的使用方法和场景,不管是以上那种作业类型,ejl都支持为其添加监听器,能够实现在作业开始执行前和执行完成后做一些准备和清理工作,并且是在分布式环境下进行,也就是保证所有节点还未开始执行时,就执行监听器中的befor方法,在所有节点都执行完成后,在执行after方法,这是非常有帮助的,通常我们都会用到,比如我们在任务开始前需要准备一些基础数据,在结束需要告知下游系统完成,其接口如下:

代码语言:javascript复制
public interface ElasticJobListener {   
    void beforeJobExecuted(ShardingContexts var1);   
    void afterJobExecuted(ShardingContexts var1);
}

ejl是不支持作业连贯的,比如,我们的job依赖上游系统作业的完成,怎么做呢?

  1. 第一种方式在before中我们轮询监听上游作业状态,就不准备数据,这样fetchData就抓不到数据,作业就不执行,事实上我觉监听器中两个方法应该有个bool返回值更好
  2. 我们可以利用web提供的单词触发的api调用,我们不配置cron表达式,当上游系统作业完成时可以通过http请求调用我们的api启动我们的job
坑和解决方案

这里记录一下使用过程中踩过的坑

  • 第一坑,在spring boot中假设我们开启两个任务,配置如下,这一看没什么问题,而且一般情况下程序员都习惯cv大法,然后改一改,但总会有一些忽略的地方,下面的配置两个job都有共同的jobConfig方法,这就是copy的结果,类名改了,Bean的Name改了,job名称也改了,就是方法名没改,并且编译阶段和启动都不会报错,但导致的问题就是只有一个job作业会注册成功,当时我们排查了好久才发现这个问题,因为以前也确实没遇到不同类中方法重名导致的问题,这是因为spting 会把方法名,返回值作为构造一个对象的key,此处方法返回类型和方法名一致,导致此对象只会被创建一次。
代码语言:javascript复制
public class JobConfig1 {       
    @Resource   
    private ZookeeperRegistryCenter regCenter;   
    @Bean(name = "job1")   
    public DataflowJob jobConifg() {       
        return new Job1()();   
    }
}
public class JobConfig2 {       
    @Resource   
    private ZookeeperRegistryCenter regCenter;   
    @Bean(name = "job2")   
    public DataflowJob jobConifg() {       
        return new  Job1();   
    }
}
  • 第二坑,就是上面提到的监听器,问题是分布式环境下会出现多节点都调用befor和after的问题,其实也就是并发的问题,我们分析如下代码,通过分布式一致性协调服务判断是否所有节点开始,如果是,则执行before方法,如果不是,则锁住进入睡眠状态,最终当其中某一个节点执行完before方法清楚标记,唤醒睡眠的节点,问题出在哪里了?我们看isAllStarted()方法,就是判断节点总数是不是和zk上写入的节点数一样,也就是说假设1,2,3三个节点此时同一时刻都进入isAllStarted()方法,他们都往zk上写入了,都是有可能判断成功的,则会都进入before方法。
  1. 正确的做法是我们应该在doBefore...方法加一个分布式锁,即便isAllStarted都判断成功了,也有分布式锁保障,只有获得锁的方法才能执行doBefore...方法。
  2. 网上也有一种做法,就是在i'sAllStated()再加一个条件,只有指定的分片才能执行doBefore...方法,但这有一种问题,就是这个分片被禁用后,就会导致任务无法执行,因为所有分片都在睡眠中,都无法被唤醒。
代码语言:javascript复制
if (guaranteeService.isAllStarted()) {
            doBeforeJobExecutedAtLastStarted(shardingContexts);
            guaranteeService.clearAllStartedInfo();
            return;
}
long before = timeService.getCurrentMillis();
try {
     synchronized (startedWait) {
          startedWait.wait(startedTimeoutMilliseconds);
     }
} catch (final InterruptedException ex) {
    Thread.interrupted();
}

public boolean isAllStarted() {   
     return this.jobNodeStorage.isJobNodeExisted("guarantee/started") 
     && 
     this.configService.load(false).getTypeConfig()
        .getCoreConfig().getShardingTotalCount() == 
     this.jobNodeStorage.getJobNodeChildrenKeys("guarantee/started").size();   
}

第一种解决方案代码:

代码语言:javascript复制
leaderLatch = new LeaderLatch(client, path, id);
   LeaderLatchListener leaderLatchListener = new LeaderLatchListener() {
        @Override
         public void isLeader() {
            doBefore....
            clearAllStartedInfo
            //释放锁
         }
        @Override
        public void notLeader() {
            //休眠,等待唤醒
        }
   };
leaderLatch.addListener(leaderLatchListener);
leaderLatch.start();

第二种解决方案代码:

代码语言:javascript复制
while(shardingContexts.getShardingItemParameters().containsKey(0)) {
    if (guaranteeService.isAllStarted()) {
            doBeforeJobExecutedAtLastStarted(shardingContexts);
            guaranteeService.clearAllStartedInfo();
            return;
    }
    Thread.sleep(10)
}
//否则直接休眠
try {
     synchronized (startedWait) {
          startedWait.wait(startedTimeoutMilliseconds);
     }
} catch (final InterruptedException ex) {
    Thread.interrupted();
}

0 人点赞