集成elastic-job分布式调度定时任务

2022-11-02 16:43:02 浏览数 (1)

前言

定时任务这一组件在工作过程中经常使用到,在单机节点上可以直接选择使用Spring自带的定时任务组件hubble-task,而这种定时任务一旦确定固化了定时触发策略,也无法动态开启关闭,所以后来有了Quartz

Quartz是定时任务领域的一个开源项目,由JAVA开发,可以通过API调度定时任务的启停及策略,还有对JTA事务跟集群的支持等等强大功能。

但是Quartz又有它的一些缺点:

  • Quartz调整定时任务需要通过API的方式进行调度,本质上还是没有脱离业务系统。
  • Quartz需要持久化数据到底层数据表,对业务系统的数据侵入较高。
  • Quartz也并没有支持分布式的调度执行,只能做到单个任务单个执行

elastic-job就是当当在Quartz的基础上进行了二次封装,elastic-job有两种版本:

  • Elastic-Job-Cloud:针对微服务的部署方式
  • Elastic-Job-Lite:基于zookeeper作为注册中心的部署方式

这两个版本除了部署方式不一样在api上是一样的,elastic-job相对于Quartz增加了很多新特性:

  1. 支持UI页面,可以在web页面上动态调整定时策略跟启停
  2. 基于Zookeeper作为分布式的调度,调度跟任务解耦
  3. 支持了分布式调度分片,同一个任务可以分成多片执行
  4. 作业类型多种,支持Simple、DataFLow数据流、Script脚本
  5. 失效转移,下线的机器的任务会重新分片执行
  6. 作业分片的一致性,任务分片后不会重复执行
  7. 错过执行的作业补偿

安装

安装elastic-job-lite方式,需要提前安装zookeeper,如果需要安装教程可以看这篇文章:Linux在线安装Zookeeper

elastic-job在apache的地址:elasticjob

然后就需要运行包含Elastic-Job-Lite和业务代码的jar文件。不限于jar或war的启动方式。

源码地址:elastic-job-lite

启动elastic-job-lite-console

下载2.1.4版本的源码:

代码语言:javascript复制
https://codeload.github.com/apache/shardingsphere-elasticjob/zip/refs/tags/2.1.4

下载完成解压后有如下目录:

进入elastic-job-lite文件下的elastic-job-lite-console。在此目录下进行打包,打包命令

代码语言:javascript复制
mvn clean install -Dmaven.test.skip=true  

打包好了启动jar包即可,也可以直接启动源码,找到console模块下的ConsoleBootstrap类进行启动

启动完成后访问ip:8899,账密为:root/root

进入系统后进入注册中心配置,填写需要注册的zookeeper地址进行连接。

下面是在linux中安装配置。也可以直接将打好的包放到linux中执行。

安装控制台

elastic-job 3.0后没有了console模块,有了更加优美的ui控制台

这里直接下载控制台包:

代码语言:javascript复制
https://archive.apache.org/dist/shardingsphere/elasticjob-ui-3.0.0-RC1/apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz

下载后上传到服务器进行解压

代码语言:javascript复制
 tar -zxvf apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz

到bin文件里进行启动

代码语言:javascript复制
./start.sh

启动成功后访问ip地址:8088,默认账号密码为root/root

进入后需要在全局配置-注册中心配置添加注册中心

新增完成后建立连接,elastic-job初步就搭建好了,如果想要引入数据源需要修改conf文件下的application.properties配置文件

我想要修改为mysql作为数据库,需要再lib文件中加入连接包,这个手动上传即可。

代码语言:javascript复制
vim application.properties

修改为mysql的驱动跟连接方式

保存文件然后重新启动elastic-job,在事件追踪数据源配置中添加数据源,如下图:

点击建立连接,后面定时任务的配置及日志会记录在表里

集成

简单集成

引入pom依赖

代码语言:javascript复制
<dependency>
	<groupId>com.cxytiandi</groupId>
	<artifactId>elastic-job-spring-boot-starter</artifactId>
	<version>1.0.0</version>
</dependency>
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-framework</artifactId>
	<version>2.10.0</version>
</dependency>
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-recipes</artifactId>
	<version>2.10.0</version>
</dependency>

在配置文件application.properties添加配置:

代码语言:javascript复制
elasticJob.zk.serverLists=localhost:2181
elasticJob.zk.namespace=user-sync

然后就可以直接定义一个job类来验证了,代码如下:

代码语言:javascript复制
@Component
@ElasticJobConf(name = "TestJob",cron="0 0 0 * * ?",shardingTotalCount=5) // 每天零点执行
public class TestJob extends SimpleJob{
  @Override
  public void execute(ShardingContext shardingContext) {
  	// 要执行的逻辑
  }
}

这种方式的定时策略是依赖在ElasticJobConf注解上,调整注解的配置即可。

常规集成

常规集成有三个类

  • ElasticJobConfig:elastic-job组件的配置,举例zookeeper配置中心
  • ElasticJobHandler:job任务的具体执行类可以配置在这里
  • ElasticJobListener:job任务的监听,开始跟结束
  • ElasticJobProperties:从配置文件读取zookeeper配置

引入依赖

代码语言:javascript复制
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-common-core</artifactId>
    <version>${elasticjob.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>guava</artifactId>
            <groupId>com.google.guava</groupId>
        </exclusion>
        <exclusion>
            <artifactId>curator-framework</artifactId>
            <groupId>org.apache.curator</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>${elasticjob.version}</version>
</dependency>

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>${elasticjob.version}</version>
</dependency>
<!--解决冲突 elasticjob-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.10.0</version>
</dependency>

ElasticJobConfig

代码语言:javascript复制
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true")
public class ElasticJobConfig {
    private final ElasticJobProperties jobProperties;

    public ElasticJobConfig(ElasticJobProperties jobProperties) {
        this.jobProperties = jobProperties;
    }

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter() {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(jobProperties.getServerLists(),
                jobProperties.getNamespace()));
    }

    @Bean
    public ElasticJobListener elasticJobListener() {
        return new ElasticJobListener(100, 100);
    }
}

ElasticJobHandler

代码语言:javascript复制
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
@AutoConfigureAfter(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class)
@ConditionalOnBean(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class)
public class ElasticJobHandlerConfig {
    private final ZookeeperRegistryCenter zookeeperRegistryCenter;

    public ElasticJobHandlerConfig(ZookeeperRegistryCenter zookeeperRegistryCenter) {
        this.zookeeperRegistryCenter = zookeeperRegistryCenter;
    }

    /**
     * 配置任务详细信息
     *
     * @param jobClass               定时任务实现类
     * @param cron                   表达式
     * @param shardingTotalCount     分片数
     * @param shardingItemParameters 分片参数
     * @return
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters,
                                                         final String jobParameters,
                                                         final String description) {
        // 定义作业核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount).
                shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).description(description).build();
        // 定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
        // 定义Lite作业根配置
        return LiteJobConfiguration.newBuilder(simpleJobConfig).build();
    }


    /**
     * 具体任务
     */
    @Bean(initMethod = "init")
    public JobScheduler pushHrhbJobScheduler(final TestJob testjob,
                                             @Value("${job.test.cron}") final String cron,
                                             @Value("${job.test.shardingTotalCount}") final int shardingTotalCount,
                                             @Value("${job.test.description}") final String description) {

        return new SpringJobScheduler(testjob, zookeeperRegistryCenter, getLiteJobConfiguration(testjob.getClass(),
                cron, shardingTotalCount, "", "", description));
    }


}

ElasticJobListener

代码语言:javascript复制
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {

    /**
     * 设置间隔时间
     *
     * @param startedTimeoutMilliseconds
     * @param completedTimeoutMilliseconds
     */
    public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        log.info("任务名:{}开始", shardingContexts.getJobParameter());
    }

    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        log.info("任务名:{}结束", shardingContexts.getJobParameter());
    }
}

ElasticJobProperties

代码语言:javascript复制
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "elasticjob")
public class ElasticJobProperties {
    private boolean enabled = true;

    private String serverLists;

    private String namespace;

    public boolean isEnabled() {
        return enabled;
    }

    public void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }

    public String getServerLists() {
        return serverLists;
    }

    public void setServerLists(String serverLists) {
        this.serverLists = serverLists;
    }

    public String getNamespace() {
        return namespace;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }
}

我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=1qq6a7mdoq048

0 人点赞