前言
定时任务这一组件在工作过程中经常使用到,在单机节点上可以直接选择使用Spring自带的定时任务组件hubble-task,而这种定时任务一旦确定固化了定时触发策略,也无法动态开启关闭,所以后来有了Quartz。
Quartz是定时任务领域的一个开源项目,由JAVA开发,可以通过API调度定时任务的启停及策略,还有对JTA事务跟集群的支持等等强大功能。
但是Quartz又有它的一些缺点:
- Quartz调整定时任务需要通过API的方式进行调度,本质上还是没有脱离业务系统。
- Quartz需要持久化数据到底层数据表,对业务系统的数据侵入较高。
- Quartz也并没有支持分布式的调度执行,只能做到单个任务单个执行
而elastic-job就是当当在Quartz的基础上进行了二次封装,elastic-job有两种版本:
- Elastic-Job-Cloud:针对微服务的部署方式
- Elastic-Job-Lite:基于zookeeper作为注册中心的部署方式
这两个版本除了部署方式不一样在api上是一样的,elastic-job相对于Quartz增加了很多新特性:
- 支持UI页面,可以在web页面上动态调整定时策略跟启停
- 基于Zookeeper作为分布式的调度,调度跟任务解耦
- 支持了分布式调度分片,同一个任务可以分成多片执行
- 作业类型多种,支持Simple、DataFLow数据流、Script脚本
- 失效转移,下线的机器的任务会重新分片执行
- 作业分片的一致性,任务分片后不会重复执行
- 错过执行的作业补偿
安装
安装elastic-job-lite方式,需要提前安装zookeeper,如果需要安装教程可以看这篇文章:Linux在线安装Zookeeper
elastic-job在apache的地址:elasticjob
然后就需要运行包含Elastic-Job-Lite和业务代码的jar文件。不限于jar或war的启动方式。
源码地址:elastic-job-lite
启动elastic-job-lite-console
下载2.1.4版本的源码:
代码语言:javascript复制https://codeload.github.com/apache/shardingsphere-elasticjob/zip/refs/tags/2.1.4
下载完成解压后有如下目录:
进入elastic-job-lite文件下的elastic-job-lite-console。在此目录下进行打包,打包命令
代码语言:javascript复制mvn clean install -Dmaven.test.skip=true
打包好了启动jar包即可,也可以直接启动源码,找到console模块下的ConsoleBootstrap类进行启动
启动完成后访问ip:8899,账密为:root/root
进入系统后进入注册中心配置,填写需要注册的zookeeper地址进行连接。
下面是在linux中安装配置。也可以直接将打好的包放到linux中执行。
安装控制台
在elastic-job 3.0后没有了console模块,有了更加优美的ui控制台
这里直接下载控制台包:
代码语言:javascript复制https://archive.apache.org/dist/shardingsphere/elasticjob-ui-3.0.0-RC1/apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz
下载后上传到服务器进行解压
代码语言:javascript复制 tar -zxvf apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz
到bin文件里进行启动
代码语言:javascript复制./start.sh
启动成功后访问ip地址:8088,默认账号密码为root/root
进入后需要在全局配置-注册中心配置添加注册中心
新增完成后建立连接,elastic-job初步就搭建好了,如果想要引入数据源需要修改conf文件下的application.properties配置文件
我想要修改为mysql作为数据库,需要再lib文件中加入连接包,这个手动上传即可。
代码语言:javascript复制vim application.properties
修改为mysql的驱动跟连接方式
保存文件然后重新启动elastic-job,在事件追踪数据源配置中添加数据源,如下图:
点击建立连接,后面定时任务的配置及日志会记录在表里
集成
简单集成
引入pom依赖
代码语言:javascript复制<dependency>
<groupId>com.cxytiandi</groupId>
<artifactId>elastic-job-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
</dependency>
在配置文件application.properties添加配置:
代码语言:javascript复制elasticJob.zk.serverLists=localhost:2181
elasticJob.zk.namespace=user-sync
然后就可以直接定义一个job类来验证了,代码如下:
代码语言:javascript复制@Component
@ElasticJobConf(name = "TestJob",cron="0 0 0 * * ?",shardingTotalCount=5) // 每天零点执行
public class TestJob extends SimpleJob{
@Override
public void execute(ShardingContext shardingContext) {
// 要执行的逻辑
}
}
这种方式的定时策略是依赖在ElasticJobConf注解上,调整注解的配置即可。
常规集成
常规集成有三个类
- ElasticJobConfig:elastic-job组件的配置,举例zookeeper配置中心
- ElasticJobHandler:job任务的具体执行类可以配置在这里
- ElasticJobListener:job任务的监听,开始跟结束
- ElasticJobProperties:从配置文件读取zookeeper配置
引入依赖
代码语言:javascript复制<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-common-core</artifactId>
<version>${elasticjob.version}</version>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>curator-framework</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<!--解决冲突 elasticjob-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
</dependency>
ElasticJobConfig
代码语言:javascript复制import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true")
public class ElasticJobConfig {
private final ElasticJobProperties jobProperties;
public ElasticJobConfig(ElasticJobProperties jobProperties) {
this.jobProperties = jobProperties;
}
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter() {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(jobProperties.getServerLists(),
jobProperties.getNamespace()));
}
@Bean
public ElasticJobListener elasticJobListener() {
return new ElasticJobListener(100, 100);
}
}
ElasticJobHandler
代码语言:javascript复制import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
@AutoConfigureAfter(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class)
@ConditionalOnBean(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class)
public class ElasticJobHandlerConfig {
private final ZookeeperRegistryCenter zookeeperRegistryCenter;
public ElasticJobHandlerConfig(ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
}
/**
* 配置任务详细信息
*
* @param jobClass 定时任务实现类
* @param cron 表达式
* @param shardingTotalCount 分片数
* @param shardingItemParameters 分片参数
* @return
*/
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters,
final String jobParameters,
final String description) {
// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount).
shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).description(description).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
// 定义Lite作业根配置
return LiteJobConfiguration.newBuilder(simpleJobConfig).build();
}
/**
* 具体任务
*/
@Bean(initMethod = "init")
public JobScheduler pushHrhbJobScheduler(final TestJob testjob,
@Value("${job.test.cron}") final String cron,
@Value("${job.test.shardingTotalCount}") final int shardingTotalCount,
@Value("${job.test.description}") final String description) {
return new SpringJobScheduler(testjob, zookeeperRegistryCenter, getLiteJobConfiguration(testjob.getClass(),
cron, shardingTotalCount, "", "", description));
}
}
ElasticJobListener
代码语言:javascript复制import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
/**
* 设置间隔时间
*
* @param startedTimeoutMilliseconds
* @param completedTimeoutMilliseconds
*/
public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
}
@Override
public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
log.info("任务名:{}开始", shardingContexts.getJobParameter());
}
@Override
public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
log.info("任务名:{}结束", shardingContexts.getJobParameter());
}
}
ElasticJobProperties
代码语言:javascript复制import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "elasticjob")
public class ElasticJobProperties {
private boolean enabled = true;
private String serverLists;
private String namespace;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getServerLists() {
return serverLists;
}
public void setServerLists(String serverLists) {
this.serverLists = serverLists;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}
我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=1qq6a7mdoq048