分布式调度Elastic-Job攻略

2019-08-20 16:06:45 浏览数 (1)

昨天虽然试用了一下唯品会的“土星”,但是我实在没想明白他的Job该怎么用Spring来托管,所以没有使用。今天来说一下当当的Elastic-Job.

安装管理平台

先说一下Elastic-Job的管理平台跟Java的Job开发没有半毛钱关系,他只是把注册进Zookeeper的信息读取出来,进行控制。

先下载Elastic-Job的源代码git clone https://github.com/elasticjob/elastic-job-lite.git,编译之后找到elastic-job-lite-console-3.0.0.M1-SNAPSHOT.tar.gz上传到服务器,解压之后进入/bin目录。

先不要急着执行start.sh,因为这个文件里面有Windows字符,linux无法识别。

所以先执行 sed -i 's/r$//' start.sh 改掉Windows字符。

然后执行nohup ./start.sh &

在浏览器打开服务器的8899端口

先配置好zookeeper的注册中心地址,命名空间,基本上这个就装好了。

编写Java Job

pom

代码语言:javascript复制
<dependency>
   <groupId>com.github.kuhn-he</groupId>
   <artifactId>elastic-job-lite-spring-boot-starter</artifactId>
   <version>2.1.5</version>
   <exclusions>
      <exclusion>
         <artifactId>curator-client</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
      <exclusion>
         <artifactId>curator-framework</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
      <exclusion>
         <artifactId>curator-recipes</artifactId>
         <groupId>org.apache.curator</groupId>
      </exclusion>
   </exclusions>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-framework</artifactId>
   <version>2.12.0</version>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>2.12.0</version>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-client</artifactId>
   <version>2.12.0</version>
</dependency>

资源文件添加

代码语言:javascript复制
elaticjob:
  zookeeper:
    server-lists: 192.168.5.129:2188
    namespace: elastic-job-lite-springboot

这里要跟你管理平台的保持一致

配置文件

代码语言:javascript复制
@Configuration
@ConditionalOnExpression("'${elatic.zookeeper.server-lists}'.length() >0")
public class ElasticConfig {
    /**
     * 初始化配置
     * @param serverList
     * @param namespace
     * @return
     */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${elaticjob.zookeeper.server-lists}") String serverList
            , @Value("${elaticjob.zookeeper.namespace}") String namespace) {

        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }

    /**
     * 设置活动监听,前提是已经设置好了监听,见下一个目录
     * @return
     */
    @Bean
    public ElasticJobListener elasticJobListener() {
        return new ElasticJobListener(100, 100);
    }
}

监听器

代码语言:javascript复制
@Component
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
    /**
     * 设置间隔时间
     * @param startedTimeoutMilliseconds
     * @param completedTimeoutMilliseconds
     */
    public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    /**
     * 任务开始
     * @param shardingContexts
     */
    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        System.out.println("任务开始");
    }

    /**
     * 任务结束
     * @param shardingContexts
     */
    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        System.err.println("任务结束");
    }

}

Job

代码语言:javascript复制
@Component
@ElasticSimpleJob(cron="0/5 * * * * ?",jobName="jobTask",shardingTotalCount=2,jobParameter="测试参数",shardingItemParameters="0=Chengdu0,1=Chengdu1")
public class StockSimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        switch (shardingContext.getShardingItem()) {
            case 0:
                System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, "  
                                "当前分片项: %s.当前参数: %s,"  
                                "当前任务名称: %s.当前任务参数: %s"
                        ,
                        Thread.currentThread().getId(),
                        shardingContext.getShardingTotalCount(),
                        shardingContext.getShardingItem(),
                        shardingContext.getShardingParameter(),
                        shardingContext.getJobName(),
                        shardingContext.getJobParameter()

                        )
                );
                break;
            case 1:
                System.out.println("啦啦啦");
                break;
            default:
                break;
        }
    }
}

启动项目可以看到

2018-11-07 19:29:08.900 INFO [schedule-center,,,] 21244 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8011 (http) with context path '/api-sc' 2018-11-07 19:29:08.901 INFO [schedule-center,,,] 21244 --- [ main] .s.c.n.e.s.EurekaAutoServiceRegistration : Updating port to 8011 2018-11-07 19:29:08.903 INFO [schedule-center,,,] 21244 --- [ main] c.c.schedule.ScheduleCenterApplication : Started ScheduleCenterApplication in 16.618 seconds (JVM running for 17.331) 啦啦啦 ------Thread ID: 96, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: ------Thread ID: 100, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: 啦啦啦 ------Thread ID: 102, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: 啦啦啦

查看平台的作业

我们可以在这里面修改他的配置

经启动多个实例,我们可以看到分片会被多个实例均摊,相同的分片只会在一个进程内执行,多个Job也是一样,不会重复执行。退出一个进程,单个进程就会执行全部分片,实现了高可用。

动态添加Job

代码语言:javascript复制
//@Component
//@ElasticSimpleJob(cron="0/5 * * * * ?",jobName="testTask",shardingTotalCount=2,jobParameter="测试参数",shardingItemParameters="0=A,1=B")
public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("TestJob");
    }
}

我们只写了一个Job,不进行配置

我们可以在Restful中进行动态添加

代码语言:javascript复制
@RestController
public class TestController {

    @Autowired
    private ZookeeperRegistryCenter zookeeperRegistryCenter;


    /**
     * 动态添加任务逻辑
     */
    @GetMapping("/test")
    public void test(@RequestParam("cron") String cron) {
        int shardingTotalCount = 2;
        String jobName = UUID.randomUUID().toString()   "-test123";

        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
                .newBuilder(jobName, cron, shardingTotalCount)
                .shardingItemParameters("0=A,1=B")
                .build();

        SimpleJobConfiguration simpleJobConfiguration =
                new SimpleJobConfiguration(jobCoreConfiguration, TestJob.class.getCanonicalName());
        JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build());



        try {
            jobScheduler.init();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("定时任务创建失败");
        }
    }
}

通过@RequestParam("cron") String cron参数,我们可以动态给该Job添加可变的时间配置。

在zookeeper中我们可以看到他的注册信息

WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2188(CONNECTED) 0] ls / [zookeeper, elastic-job-lite-springboot, spark, hadoop-ha, dubbo, zoo, pushservers, guanjian] [zk: localhost:2188(CONNECTED) 1] ls /elastic-job-lite-springboot [com.guanjian.job.TestJob, e5d89fde-a665-46e2-8cac-7560a48812c9-test123, com.cloud.schedule.jobs.MyJob, com.cloud.schedule.jobs.StockSimpleJob, com.guanjian.job.StockSimpleJob] [zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.

com.guanjian.job.TestJob com.cloud.schedule.jobs.MyJob com.cloud.schedule.jobs.StockSimpleJob com.guanjian.job.StockSimpleJob [zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.

com.cloud.schedule.jobs.MyJob com.cloud.schedule.jobs.StockSimpleJob [zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.StockSimpleJob [leader, servers, config, instances, sharding] [zk: localhost:2188(CONNECTED) 3]

0 人点赞