分布式任务调度框架 Elastic-Job 之动态任务发布实现详解

2020-05-13 13:51:31 浏览数 (1)

官网地址:http://elasticjob.io

1. 摘要 常见的业务场景有以下几种:

  1. 某拼购电商平台在每天上午 9 点,下午 3 点和晚上 9 点发放优惠券。
  2. 某银行系统需要在信用卡到期还款日的前三天进行短信提醒。
  3. 12306 会在春运期间设置定时进行分批放票。
  4. 以上场景就是任务调度所需要解决的问题。

2. 什么是任务调度 ?

任务调度是指系统为了按预定的时间和频次自动完成特定任务的过程。

任务调度 JDK 的几种实现方式如下: 1)多线程: 通过开启一个线程,while 循环执行业务逻辑,让线程 sleep 休眠,达到任务间隔执行。代码清单如下图所示:

2)Timer 类

Timer 的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer 并行执行多个任务,同一个 Timer 中的任务是串行执行

3)ScheduledExecutorService 接口

Java 5 推出了基于线程池设计的 ScheduledExecutorService,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。

从以上可以看出,无论是 Thread,还是 Timer,或是 ScheduledExecutorService 都只能提供基于开始时间与重复间隔的任务调度,而不能胜任更加复杂的任务调度需求。比如,设置每日凌晨零点执行任务、复杂调度任务的管理、任务间传递数据等等。

3. 什么是分布式?

当前软件的架构正在逐步转变为分布式系统架构,将传统的单体架构转为微服务架构,服务之间通过网络交互来完成业务处理,如下图所示,电商系统为分布式架构,由用户服务、商品服务等组成:

4. 什么是分布式任务调度?

通常任务调度的程序是集成在 Spring Boot 应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的任务调度程序,通知推送服务中包括了定时向用户发通知的任务调度程序等,由于采用分布式系统架构,一个服务往往会部署多个冗余实例来进行业务处理, 像这种分布式系统环境下运行的任务调度,我们称之为分布式任务调度,如下图:

代码语言:javascript复制
分布式调度要实现的目标

不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于 将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:

1)并行任务调度

并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机 CPU 的处理能力是有限的。

如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我 们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。

2)高可用

若某一个实例宕机,不影响其他实例来执行任务。

3)弹性扩容

当集群中增加实例就可以提高并执行任务的处理效率。

4)任务管理与监测

对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。

5)避免任务重复执行

当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中定时发放优惠券的服务,就会对同一用户发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次,考虑采用下边的方法:

  • 分布式锁,多个实例在任务执行前首先需要获取锁,如果获取失败那么久证明有其他服务已经再运行,如果获取成功那么证明没有服务在运行定时任务,那么就可以执行。
  • ZooKeeper选举,利用 ZooKeeper 对 Leader 实例执行定时任务,有其他业务已经使用了 ZooKeeper,那么执行定时任务的时候判断自己是否是 Leader,如果不是则不执行,如果是则执行业务逻辑,这样也能达到我们的目的。

Elastic-Job 框架实现了以上的分布式任务调度目标,功能列表如下图所示:

备注:具体前参考 Elastic-Job 官网(http://elasticjob.io

关于 Elastic-Job 的基本使用及示例直接参考官网示例即可学习使用,SpringBoot 应用整合 elastic-job 框架及其简单,这里不再赘述。

5. 分布式任务调度框架 Elastic-Job 之动态任务发布实现

1)搭建 SpringBoot 应用,可参考【小白都能看得懂的服务调用链路追踪设计与实现】这篇的第三节的开发环境准备和工程初始化步骤等。

2)在 pom.xml 文件加入 elastic-job 任务调度的依赖:

代码语言:javascript复制
<dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>elastic-job-lite-spring</artifactId>
      <version>2.1.5</version>
 </dependency>

整体的 pom 文件内容如下图所示:

3)工程的结构如下图:

4)编码实现如下:

elastic-job 的 Zookeeper 注册中心配置类 ElasticJobRegistryCenterConfig,代码如下:

测试自定义调度任务发布的控制层 DynamicController,代码如下:

自定义的调度任务 CustomJob,实现了 SimpleJob 接口,即 elastic-job 框架定义的 job,代码如下:

动态任务配置与启动 DynamicJobService,这个类很重要,它主要负责动态任务的详细参数配置及动态任务的启动,代码如下:

其他的类比较简单,不是本文的重点,MyService类 的代码如下:

代码语言:javascript复制
package cn.smart4j.giserway.dynamic.task.service;

import cn.smart4j.giserway.dynamic.task.dao.MyDao;
import cn.smart4j.giserway.dynamic.task.vo.BookVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @Description: MyService
 * @Param:
 * @return:
 * @Author: Mr.Zhang
 * @Date: 2020/4/28
 */
@Service
public class MyService {

    @Autowired
    MyDao myDao;


    public List<BookVO> queryBooks() {
        return myDao.queryBooks();
//        return SpringContextUtils.getBean(MyDao.class).queryBooks();
    }
}

MyDao 类代码如下:

代码语言:javascript复制

package cn.smart4j.giserway.dynamic.task.dao;

import cn.smart4j.giserway.dynamic.task.vo.BookVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
 * @Description: 数据访问层
 * @Param:
 * @return:
 * @Author: Mr.Zhang
 * @Date: 2020/4/28
 */
@Repository
public class MyDao {
    @Autowired
    JdbcTemplate jdbcTemplate;

    public List<BookVO> queryBooks() {
        String sql = "select * from t_book";
        return jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(BookVO.class));
//        return SpringContextUtils.getBean(JdbcTemplate.class).query(sql, new BeanPropertyRowMapper<>(BookVO.class));
    }
}

BookVO 类代码如下:

代码语言:javascript复制
package cn.smart4j.giserway.dynamic.task.vo;

/**
 * @Description: BookVO
 * @Param:
 * @return:
 * @Author: Mr.Zhang
 * @Date: 2020/4/28
 */
public class BookVO {
    /**
     * 书籍id
     */
    private int id;
    /**
     * 书籍名称
     */
    private String bookName;
    /**
     * 价格
     */
    private double money;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getBookName() {
        return bookName;
    }

    public void setBookName(String bookName) {
        this.bookName = bookName;
    }

    public double getMoney() {
        return money;
    }

    public void setMoney(double money) {
        this.money = money;
    }

    @Override
    public String toString() {
        return "BookVO{"  
                "id="   id  
                ", bookName='"   bookName   '''  
                ", money="   money  
                '}';
    }
}

5) application.properties 配置文件如下:

代码语言:javascript复制
server.port=18108
spring.application.name = dynamic-task
#logging.level.root = info

# 数据源
spring.datasource.driver‐class‐name = com.mysql.cj.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/elastic_job_demo?useUnicode=true
spring.datasource.username = root
spring.datasource.password = root

# zk地址
registry.serverList = localhost:2181
# 命名空间
registry.namespace = dynamic-job

6)启动本地的 Zookeeper 服务,如下:

7)下面可以开始测试动态任务的发布了,运行 SpringBoot 启动类,如下所示:

8)通过 Postman 请求,结果如下:

程序运行报空指针异常,出现在自定义的调度任务实现类 CustomJob 的28行,通过 Debug 打断点可知,这里的 mySevice 没有使用容器中注入的,即@Autowired 注入不起作用,那直接换成 new MyService().queryBooks(); 试试,再启动服务后,同样 Postman 请求,结果如下:

在 MyService 中调 dao 层时报 myDao 空指针异常,再修改为 new MyDao().queryBooks(); 试试,再启动服务后,同样 Postman 请求,结果如下:

同样还是无法发布调度任务成功,其实仔细观察发现,这里的 CustomJob 是我们通过调接口,Class.forName 装载 cn.smart4j.giserway.dynamic.task.job.CustomJob 类后,通过 newInstance() 获取 SimpleJob 实例的,这个实例没有注入容器,只存在于 JVM 中,Spring 容器没有注入这个 Bean,所以这里的 CustomJob 中注入的 MyService 无法使用,其他依赖注入也都失效了。为了解决通过 @Autowired 注入后获取不到容器中的 Bean,我们可以使用 Spring上下文获取容器中已经注入的 Bean,即实现 ApplicationContextAware 接口,提取获取容器中的 Bean 对象工具类 SpringContextUtils,代码如下:

9)把有关使用自动注入的地方修改为通过 SpringContextUtils.getBean()获取bean对象即可,细心的小伙伴可能注意到报空指针的类中代码注释掉的部分,即是最终的代码实现,修改如下:

10)再次启动服务,测试结果如下:

说明自定义调度任务 CustomJob 动态发布成功

11)可以通过 ZooInspector 工具查看,任务详情,如下截图所示:

连接 Zookeeper 服务,查看调度任务相关信息,包括命名空间、任务名称、任务选举的 leader、config 任务配置信息、任务实例、任务分片等。如下图所示:

6. 小结

本文介绍了什么是任务调度 ?任务调度的使用场景有哪些?什么是分布式任务调度?Elastic-Job 如何支持动态任务调度的发布?小伙伴们是不是等不及要试试了,来吧,我们一起 Coding......

0 人点赞