Spring Batch实战(一)

2022-07-12 14:26:26 浏览数 (1)

1、SpringBatch是什么?

1.1、SpringBatch的背景

虽然开源软件项目和相关社区把更多的注意力集中在基于web和微服务的体系结构框架上,但明显缺乏对可重用体系结构框架的关注,以适应基于java的批处理需求,尽管仍然需要在企业IT环境中处理此类处理。缺乏标准的、可重用的批处理体系结构导致了在客户企业IT功能中开发的许多一次性的内部解决方案的激增。

SpringSource(现为Pivotal)和埃森哲(Accenture)合作改变了这一现状。埃森哲在实现批处理架构方面的实践行业和技术经验、SpringSource的技术深度和Spring经过验证的编程模型共同构成了一个自然而强大的伙伴关系,可以创建高质量的、与市场相关的软件,旨在填补企业Java的一个重要空白。这两家公司都与许多客户合作,他们通过开发基于spring的批处理体系结构解决方案来解决类似的问题。这提供了一些有用的附加细节和现实约束,有助于确保解决方案可以应用于客户提出的现实问题。

Accenture为Spring batch项目贡献了以前专有的批处理体系结构框架,以及提交者资源,以推动支持、增强和现有的特性集。Accenture的贡献是基于在过去几代平台(COBOL/Mainframe, c /Unix,现在是Java/anywhere)中构建批处理架构的几十年经验。

埃森哲和SpringSource的合作旨在促进软件处理方法、框架和工具的标准化,企业用户在创建批处理应用程序时可以一致地利用这些方法、框架和工具。希望向企业IT环境提供标准的、经过验证的解决方案的公司和政府机构可以从Spring Batch中受益。

1.2、SpringBatch概念

企业领域中的许多应用程序需要批量处理来在关键任务环境中执行业务操作。这些业务运作包括:

大量信息的自动化、复杂处理,在没有用户交互的情况下处理效率最高。这些操作通常包括基于时间的事件(如月末计算、通知或通信)。

跨非常大的数据集(例如,保险收益确定或费率调整)重复处理的复杂业务规则的定期应用。

将从内部和外部系统接收到的信息集成到记录系统中,这些信息通常需要以事务性的方式进行格式化、验证和处理。批处理每天为企业处理数十亿笔交易。

Spring Batch是一个轻量级、全面的批处理框架,旨在开发健壮的批处理应用程序,这对企业系统的日常操作至关重要。Spring Batch构建在人们所期望的Spring框架特性(生产力、基于pojo的开发方法和通用易用性)之上,同时使开发人员在必要时更容易访问和利用更高级的企业服务。Spring Batch不是一个调度框架。商业和开放源码领域中都有许多优秀的企业调度器(如Quartz、Tivoli、Control-M等)。它旨在与调度器一起工作,而不是取代调度器。

Spring Batch提供了在处理大量记录时必不可少的可重用功能,包括日志/跟踪、事务管理、作业处理统计、作业重启、跳过和资源管理。它还提供了更先进的技术服务和特性,通过优化和分区技术支持极高容量和高性能的批处理作业。Spring Batch既可以用于简单的用例(如将文件读入数据库或运行存储过程),也可以用于复杂的大容量用例(如在数据库之间移动大容量数据,转换数据,等等)。高容量批处理作业可以以高度可伸缩的方式利用该框架来处理大量信息

1.3、SpringBatch架构图

Spring Batch的设计考虑到了可扩展性和不同的最终用户群体。下图显示了SpirngBatch的架构图,大体分为三层:Application、Batch Core、Batch Infrastructure

这个分层的体系结构突出了三个主要的高级组件:应用程序、核心和基础设施。该应用程序包含所有的批处理作业和开发人员使用Spring batch编写的自定义代码。批处理核心包含启动和控制批处理作业所需的核心运行时类。它包括JobLauncher、Job和Step的实现。Application和Core都构建在公共基础设施之上。此基础设施包含常见的读取器、写入器和服务(如RetryTemplate),应用程序开发人员(读取器和写入器,如ItemReader和ItemWriter)和核心框架本身(重试,这是它自己的库)都使用这些服务。

上图是Spring Batch的一个简化流程图,图中涉及到一些关键模块,比如JobLauncher、Job、Step、ItemReader、ItemWrite、ItemProcessor、JobRepository等,后面的学习中我们会逐渐介绍上述中的概念以及如何快速实践创建一个批处理程序。

1.4 部分术语解释
(1)JobLauncher

JobLauncher表示一个简单的接口,用于使用一组给定的JobParameters启动作业,如下例所示:

代码语言:javascript复制
public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

它是一个接口类型,其中包含一个方法run,有两个参数,分别是Job和JobParameters。Job就是你创建的批处理的Job,JobParameters是你的Job需要的参数。预计实现将从jobRepository获取一个有效的jobExecution并执行该Job。

比如下面的使用:

代码语言:javascript复制
@RestController
public class JobLaunchController {

    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job jobLaunch;

    @GetMapping(value = "/test/run/{msg}")
    public String launchJob(@PathVariable("msg") String msg) throws JobParametersInvalidException,
            JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters parameter = new JobParametersBuilder()
                .addString("msg",msg)
                .toJobParameters();
        // job和jobParameters
        jobLauncher.run(jobLaunch,parameter);
        return "job success";
    }
}
(2) JobRepository

JobRepository是所有原型的持久性机制。它为JobLauncher、Job和Step实现提供了CRUD操作。当Job第一次启动时,将从存储库中获JobExection,并且在执行过程中,通过将StepExecution和JobExection实现传递给存储库来持久化它们。当使用java配置时,@EnableBatchProcessing注释提供了一个JobRepository作为自动开箱配置的组件之一。

(3)ItemReader

ItemReader是一个interface,它表示读取Step的输入,每次只读取一项。当ItemReader用尽了它可以提供的项时,它会通过返回null来表示。关于ItemReader接口及其各种实现的详细信息,我会在后面的文章中具体分析讲解ItemReader的原理和使用。

(4)ItemWriter

ItemWriter是一个interface,它表示Step的输出,一次一个批处理或项目块。通常,ItemWriter不知道它接下来应该接收的输入,只知道在当前调用中传递的项。关于ItemWriter接口及其各种实现的更多细节,我会在后面的文章中具体分析讲解ItemReader的原理和使用。

(5)ItemProcessor

ItemProcessor是表示的是对于业务的数据读取和写入中间的一层操作。该操作可以对ItemReader的结果集进行验证、处理和修改、当ItemReader读取一个内容时,而ItemWriter写入它们时,ItemProcessor提供一个访问点来转换或应用其他业务处理。如果在处理该项时,确定该项无效,则返回null表示不应该写出该项。关于ItemProcessor接口的更多细节,我会在后面的文章中具体分析讲解ItemReader的原理和使用。

2、SpringBatch可以解决什么问题?

一个典型的批处理程序一般是:

(1)从数据库、文件或队列中读取大量记录。

(2)以某种方式处理数据。

(3)以修改后的形式回写数据。

Spring Batch自动化了这个基本的批处理迭代,提供了将类似事务作为一个集合处理的能力,通常是在没有任何用户交互的脱机环境中。批处理作业是大多数IT项目的一部分,Spring Batch是唯一提供健壮的企业级解决方案的开源框架。

2.1、SpringBatch的应用场景是什么?

(1)定期提交批处理过程

(2)并发批处理:对作业进行并行处理

(3)分阶段的、企业消息驱动的处理

(4)大规模并行批处理

(5)故障后手动或定时重启

(6)依赖步骤的顺序处理(扩展到工作流驱动的批)

(7)部分处理:跳过记录(例如,回滚时)

(8)整批事务,用于小批大小或现有存储过程/脚本的情况

2.2、SringBatch的优势是什么?

(1)批处理开发人员使用Spring编程模型:专注于业务逻辑,让框架负责基础设施。

(2)基础设施、批处理执行环境和批处理应用程序之间的关注点清晰分离。

(3)提供公共的核心执行服务作为所有项目都可以实现的接口。

(4)提供核心执行接口的简单和默认实现,可以“开箱即用”。

(5)通过在所有层中利用spring框架,易于配置、定制和扩展服务。

(6)所有现有的核心服务都应该易于替换或扩展,不会对基础设施层产生任何影响。

(7)提供一个简单的部署模型,体系结构jar完全独立于应用程序,使用Maven构建。

3、如何使用SpringBatch?

3.1、SpringBatch环境搭建

(1)使用https://start.spring.io/ 来初始化一个spring-boot服务,选择maven project,language为Java语言,依赖为Spring Batch

Spring boot版本为2.4.5,Java版本为8.

(2)将初始化的spring boot服务用IDEA打开。

(3)了解一下pom.xml文件

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.4.5</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <groupId>com.example</groupId>
  <artifactId>springbatch</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>springbatch</name>
  <description>springbatch demo</description>
  <properties>
    <java.version>1.8</java.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <!--手工添加-->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <!--手工添加-->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <!--手工添加-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>
3.2、腾讯云RDS的使用与SpringBatch结合

本地使用DataGrip来连接腾讯云,具体见下方连接:

可以点开管理,使用外网ip来进行连接登录,如何使用腾讯云的mysql,可以参考官网的使用说明。

当然你也可以选择本地安装mysql server进行连接,做实验学习。

3.3、第一个SpringBatch应用程序
代码语言:javascript复制
package com.example.springbatch;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author lee
 * @email danishlyy@163.com
 * @date 2021/5/16
 *
 **/
@SpringBootApplication
@EnableBatchProcessing
public class SpringbatchApplication {

  public static void main(String[] args) {
    SpringApplication.run(SpringbatchApplication.class, args);
  }
}

当我们要启动一个Spring boot的application的时候,一定需要知道它的入口,如上述代码,SpringBatchApplication.java即为批处理的启动入口。读者可能会观察到该类上添加了注解 @EnableBatchProcessing,该注解的作用就是开启SpringBatch的批处理作用。

代码语言:javascript复制
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://host:port/test?useUnicode=true&characterEncoding=GBK
spring.datasource.username=username
spring.datasource.password=password
# 如果选择不同的driver-class-name 此处对应的初始化脚本也应该选择对应database类型
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
spring.batch.initialize-schema=always

批处理程序:

代码语言:javascript复制
@Slf4j
@Configuration
public class BatchConfig {

    /**
     * 创建job
     */
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    /**
     * 创建 step
     */
    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    @Bean
    public Job helloWorldJob(){
        return  jobBuilderFactory.get("helloWorldJob")
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {
            log.info("开始执行任务步骤:hello world");
            return RepeatStatus.FINISHED;
        }).build();
    }
}
3.5、SpringBatch服务首次启动自动生成的表
代码语言:javascript复制
BATCH_JOB_EXECUTION:job的执行情况,包含job的开始执行时间、结束时间、执行状态、退出原因
BATCH_JOB_EXECUTION_CONTEXT:Job执行的上下文
BATCH_JOB_EXECUTION_PARAMS:job执行的参数
BATCH_JOB_EXECUTION_SEQ:
BATCH_JOB_INSTANCE :一个job对应表中一条记录
BATCH_JOB_SEQ:有多少个job实例
BATCH_STEP_EXECUTION:Job中的step的执行情况记录
BATCH_STEP_EXECUTION_CONTEXT:Job中的step的执行时的上下文
BATCH_STEP_EXECUTION_SEQ:Job中的step的总数

这篇文章主要是介绍一下SpringBatch的整个概念、以及SpringBatch的架构体系,初学者应该怎么去使用SpringBatch、以及SpringBatch的应用场景和优势、应该基于自己的场景选择是否需要使用SpringBatch,下篇文章会围绕SpringBatch的核心概念进行介绍并会附带对应的源码供大家参考。

0 人点赞