昨天虽然试用了一下唯品会的“土星”,但是我实在没想明白他的Job该怎么用Spring来托管,所以没有使用。今天来说一下当当的Elastic-Job.
安装管理平台
先说一下Elastic-Job的管理平台跟Java的Job开发没有半毛钱关系,他只是把注册进Zookeeper的信息读取出来,进行控制。
先下载Elastic-Job的源代码git clone https://github.com/elasticjob/elastic-job-lite.git,编译之后找到elastic-job-lite-console-3.0.0.M1-SNAPSHOT.tar.gz上传到服务器,解压之后进入/bin目录。
先不要急着执行start.sh,因为这个文件里面有Windows字符,linux无法识别。
所以先执行 sed -i 's/r$//' start.sh 改掉Windows字符。
然后执行nohup ./start.sh &
在浏览器打开服务器的8899端口
先配置好zookeeper的注册中心地址,命名空间,基本上这个就装好了。
编写Java Job
pom
代码语言:javascript复制<dependency>
<groupId>com.github.kuhn-he</groupId>
<artifactId>elastic-job-lite-spring-boot-starter</artifactId>
<version>2.1.5</version>
<exclusions>
<exclusion>
<artifactId>curator-client</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-framework</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-recipes</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
资源文件添加
代码语言:javascript复制elaticjob:
zookeeper:
server-lists: 192.168.5.129:2188
namespace: elastic-job-lite-springboot
这里要跟你管理平台的保持一致
配置文件
代码语言:javascript复制@Configuration
@ConditionalOnExpression("'${elatic.zookeeper.server-lists}'.length() >0")
public class ElasticConfig {
/**
* 初始化配置
* @param serverList
* @param namespace
* @return
*/
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${elaticjob.zookeeper.server-lists}") String serverList
, @Value("${elaticjob.zookeeper.namespace}") String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
/**
* 设置活动监听,前提是已经设置好了监听,见下一个目录
* @return
*/
@Bean
public ElasticJobListener elasticJobListener() {
return new ElasticJobListener(100, 100);
}
}
监听器
代码语言:javascript复制@Component
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
/**
* 设置间隔时间
* @param startedTimeoutMilliseconds
* @param completedTimeoutMilliseconds
*/
public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
}
/**
* 任务开始
* @param shardingContexts
*/
@Override
public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
System.out.println("任务开始");
}
/**
* 任务结束
* @param shardingContexts
*/
@Override
public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
System.err.println("任务结束");
}
}
Job
代码语言:javascript复制@Component
@ElasticSimpleJob(cron="0/5 * * * * ?",jobName="jobTask",shardingTotalCount=2,jobParameter="测试参数",shardingItemParameters="0=Chengdu0,1=Chengdu1")
public class StockSimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
switch (shardingContext.getShardingItem()) {
case 0:
System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, "
"当前分片项: %s.当前参数: %s,"
"当前任务名称: %s.当前任务参数: %s"
,
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
)
);
break;
case 1:
System.out.println("啦啦啦");
break;
default:
break;
}
}
}
启动项目可以看到
2018-11-07 19:29:08.900 INFO [schedule-center,,,] 21244 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8011 (http) with context path '/api-sc' 2018-11-07 19:29:08.901 INFO [schedule-center,,,] 21244 --- [ main] .s.c.n.e.s.EurekaAutoServiceRegistration : Updating port to 8011 2018-11-07 19:29:08.903 INFO [schedule-center,,,] 21244 --- [ main] c.c.schedule.ScheduleCenterApplication : Started ScheduleCenterApplication in 16.618 seconds (JVM running for 17.331) 啦啦啦 ------Thread ID: 96, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: ------Thread ID: 100, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: 啦啦啦 ------Thread ID: 102, 任务总片数: 2, 当前分片项: 0.当前参数: Chengdu0,当前任务名称: com.cloud.schedule.jobs.StockSimpleJob.当前任务参数: 啦啦啦
查看平台的作业
我们可以在这里面修改他的配置
经启动多个实例,我们可以看到分片会被多个实例均摊,相同的分片只会在一个进程内执行,多个Job也是一样,不会重复执行。退出一个进程,单个进程就会执行全部分片,实现了高可用。
动态添加Job
代码语言:javascript复制//@Component
//@ElasticSimpleJob(cron="0/5 * * * * ?",jobName="testTask",shardingTotalCount=2,jobParameter="测试参数",shardingItemParameters="0=A,1=B")
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("TestJob");
}
}
我们只写了一个Job,不进行配置
我们可以在Restful中进行动态添加
代码语言:javascript复制@RestController
public class TestController {
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* 动态添加任务逻辑
*/
@GetMapping("/test")
public void test(@RequestParam("cron") String cron) {
int shardingTotalCount = 2;
String jobName = UUID.randomUUID().toString() "-test123";
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount)
.shardingItemParameters("0=A,1=B")
.build();
SimpleJobConfiguration simpleJobConfiguration =
new SimpleJobConfiguration(jobCoreConfiguration, TestJob.class.getCanonicalName());
JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build());
try {
jobScheduler.init();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("定时任务创建失败");
}
}
}
通过@RequestParam("cron") String cron参数,我们可以动态给该Job添加可变的时间配置。
在zookeeper中我们可以看到他的注册信息
WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2188(CONNECTED) 0] ls / [zookeeper, elastic-job-lite-springboot, spark, hadoop-ha, dubbo, zoo, pushservers, guanjian] [zk: localhost:2188(CONNECTED) 1] ls /elastic-job-lite-springboot [com.guanjian.job.TestJob, e5d89fde-a665-46e2-8cac-7560a48812c9-test123, com.cloud.schedule.jobs.MyJob, com.cloud.schedule.jobs.StockSimpleJob, com.guanjian.job.StockSimpleJob] [zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.
com.guanjian.job.TestJob com.cloud.schedule.jobs.MyJob com.cloud.schedule.jobs.StockSimpleJob com.guanjian.job.StockSimpleJob [zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.
com.cloud.schedule.jobs.MyJob com.cloud.schedule.jobs.StockSimpleJob [zk: localhost:2188(CONNECTED) 2] ls /elastic-job-lite-springboot/com.cloud.schedule.jobs.StockSimpleJob [leader, servers, config, instances, sharding] [zk: localhost:2188(CONNECTED) 3]