一文搞懂Elastic-Job(内附源码解析)

2019-06-26 17:16:11 浏览数 (1)

前言

Elastic-Job是当当基于Zookepper,Quartz开发并且开源的Java分布式定时任务,解决Quartz不支持分布式的弊端。它由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

题外话,当当的Sharding-JDBC已经加入Apache管理了,而这个Elastic-Job没有。

还有一句题外话,私下和好友交流,当初要使用elastic 后来也被换了,具体原因也没说,咱也不知道,咱也不敢问。我贴个图大家应该会明白,这是Elastic-Job 在github的地址:https://github.com/elasticjob/elastic-job-lite

来个图更能说明问题。我什么也没说啊。。。。

采用的公司还是蛮多的,像 36氪、国美、唯品会……链接:http://elasticjob.io/docs/elastic-job-lite/00-overview/company/

网上也是对Elastic-Job众多好评,目前国内使用定时作业中间件也就xxl-job能跟他竞争。

再不更新代码都落灰了……

正文

正文我就不扯没用的了,好多人也是冲着标题写着 ‘源码解析’ 来的吧,好,请看下面就是我们老艿艿总结的源码解析专栏

我也是老艿艿的不锈钢粉丝哦!!!

微信公众号Elastic-Job源码解析地址:https://mp.weixin.qq.com/s/m1VRIzeFfa_6Ly_gEDNK-w

老艿艿微信公众号:芋道源码

一定有你想看的

不是有意打广告,而是老艿艿文章确实不错,也从不搞一些吸粉的商业活动,不做盗版。文章怎么样,看一下你就懂了。

友情提醒:看源码前要会使用并理解简单的原理,这样吸收比较好,上来就看源码直接就从入门到放弃。

Elastic-Job作为分布式作业中间件有一个重要的概念就是-分片。

分片概念: 这里的分片是指将一个任务拆分成多个任务执行,有点类似 java里的Fork-Join 框架思想。

举个例子:我现在要对一些数据进行处理,首先把数据筛选出来,为了快速的执行作业,我们用2台服务器,想让每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。

作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直结果就是服务器A遍历ID以0-4结尾的数据,服务器B遍历ID以5-9结尾的数据。

如何使用Elastic-Job

Elastic-Job 提供了3种作业类型

  1. Simple类型作业
  2. DataFlow类型作业
  3. Script类型作业

Elastic-Job 提供了2种配置方式

  1. JavaCode配置
  2. Spring命名空间配置

这里我只用代码讲解 用JavaCode配置方式实现Simple作业类型。

首先提供一下官网详细的配置手册:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/

准备工作

本机安装一个 Zookepper ,不搞集群,下载,安装 ,启动 so easy,我的版本是 3.4.14

引入Maven依赖

代码语言:javascript复制
代码语言:javascript复制
<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
        <relativePath/>
    </parent>
    <name>springboot-elastic-job</name>
    <description>基于 Spring Boot 2.1.5 使用elastic-job</description>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
代码语言:javascript复制

Simple作业

代码语言:javascript复制
public class JavaSimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date())
                  " 分片项 : "   shardingContext.getShardingItem()
                  " 总片数 : "   shardingContext.getShardingTotalCount());

        switch (shardingContext.getShardingItem()) {
            case 0:
                System.out.println("do something by sharding item 0");
                break;
            case 1:
                System.out.println("do something by sharding item 1");
                break;
            case 2:
                System.out.println("do something by sharding item 2");
                break;
            // case n: ...
            //动态查询该分片下要执行的用户
            //SELECT * FROM lfp_user WHERE mod(id,#{shardingTotalCount})=#{shardingItem};
        }

    }
}

启动类和配置作业

代码语言:javascript复制
public class Application {

    public static void main(String[] args) throws UnknownHostException {
        System.out.println("Start...");
        System.out.println(InetAddress.getLocalHost());
        new JobScheduler(createRegistryCenter(), createSimpleJobConfiguration()).init();

    }


    private static CoordinatorRegistryCenter createRegistryCenter() {
        //ZookeeperConfiguration构造方法两个参数,serverLists(连接Zookeeper服务器的列表,包括IP地址和端口号,,多个地址用逗号分隔)和namespace(命名空间)
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(
                new ZookeeperConfiguration("127.0.0.1:2181", "new-elastic-job-demo"));
        regCenter.init();
        return regCenter;
    }

    private static LiteJobConfiguration createSimpleJobConfiguration() {
        //创建简单作业配置构建器,三个参数为:jobName(作业名称),cron(作业启动时间的cron表达式),shardingTotalCount(作业分片总数)
        // 定义作业核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("SimpleJobDemo", "0/15 * * * * ?", 2).build();
        // 定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, JavaSimpleJob.class.getCanonicalName());
        // 定义Lite作业根配置
        JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();

        return (LiteJobConfiguration) simpleJobRootConfig;

    }

    //配置DataflowJob
    private static void setUpDataflowJob(final CoordinatorRegistryCenter registryCenter) {
        JobCoreConfiguration coreConfiguration = JobCoreConfiguration.newBuilder("JavaDataflowJob", "0/10 * * * * ?", 2).build();
        //数据流作业配置,第三个参数为streamingProcess(是否为流式处理)
        DataflowJobConfiguration dataflowJobConfiguration = new DataflowJobConfiguration(coreConfiguration, JavaDataflowJob.class.getCanonicalName(), true);
        new JobScheduler(registryCenter, LiteJobConfiguration.newBuilder(dataflowJobConfiguration).overwrite(true).build()).init();


    }
}

启动测试

我截取了一段控制台日志

github 地址:https://github.com/362460453/springboot-elastic-job

现在搞什么配置如果非必须情况下最好不要用xml形式,好古老,真的,如果有想学Spring命名空间配置 请看下面的链接:

https://www.cnblogs.com/yushangzuiyue/p/9655847.html

现实场景怎么用?

我们假设一个场景是处理一批数据,这批数据,一台机器的话分多个片执行相当于多线程执行,实际情况我们都是多台服务器执行的,使用2台机器,那么就把压力平均分摊到两台服务器上去了,而且也能更快执行完成。 如果,以前只要两台机器在1个小时就能跑完的,现在5个小时也跑不完,怎么办呢?加机器?加机器肯定是必须的,但是我们发现代码里写死了,分2片,我们总不能去改成3、4、5,万一以后还有更多呢,所以我们可以对sql进一步优化。我们把分片数和当前分片项传到sql,这样sql可以动态去查询对应分片后的用户了

代码语言:javascript复制
public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        int shardingTotalCount = shardingContext.getShardingTotalCount();
        int shardingItem = shardingContext.getShardingItem();
        //1 查询改分片下要执行的用户,带参数shardingTotalCount和shardingItem 
        //筛选出来的数据
        //2 处理筛选的数据
        
    }
}

sql改写
//动态查询该分片下要执行的用户
SELECT * FROM lfp_user WHERE mod(id,#{shardingTotalCount})=#{shardingItem};
代码语言:javascript复制

当shardingTotalCount 有5片,并且5台机器 机器0:查询用户id能整除5的,如:5、10、15、20…… 机器1:查询用户id除以5余1的,如:1、6、11、16…… 机器2:查询用户id除以5余2的,如:2、7、12、17…… 机器3:查询用户id除以5于余3的,如:3、8、13、18…… 机器4:查询用户id除以5余4的,如:4、9、14、19……

注意事项

这里要说一个我碰到的问题,LiteJobConfiguration里有一个属性是 boolean overwrite; 默认为false,如果为false的话,第一次启动的时候,会在zookeeper中保存了一份作业信息(调度时间、参数等),后面即使修改了作业信息,无论重新启动服务或者zookeeper,还是会使用第一次启动时候的作业信息(根据作业名字)。 因此需要设为true,这样每次启动,作业信息都会覆盖zookeeper中的保存的配置信息,这样可以保证修改了配置信息可以马上使用。

代码启动中,实现类的execute方法中不能使用spring注入的对象:elastic-job是封装的quartz框架,这个特性也存留下来,execute方法中只能用static对象。

如有其他问题请查看:

https://blog.csdn.net/name_z/article/details/81274029

https://blog.csdn.net/tanga842428/article/details/52398982

0 人点赞