1、SpringBatch支持三种类型的数据源来进行批处理:
- 数据库类型
- 文件类型
- 消息类型
2、概念术语
(1)Job
在Spring Batch中,Job只是Step实例的容器。它将在逻辑上属于一个流的多个步骤组合在一起,并允许配置所有步骤的全局属性,比如可重新启动性。作业配置包含:
- Job的名称
- Step实例的定义和顺序
- 任务是否可以重新启动
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
如上图所示,我们使用JobBuilderFactory来创建了一个Job,任务的名字叫footballJob,开始第一个Step是playerLoad(),第二个Step是gameLoad(),第三个Step是playerSummarization()。
(2)JobInstance
它的含义就是一个Job对应了一个批处理的Job实例。
(3)JobParameters
JobParameters对象保存一组用于启动批处理作业的参数。它们可以用于识别,甚至在运行过程中作为参考数据,如下图所示:
在前面的示例中,有两个实例,一个用于1月1日,另一个用于1月2日,实际上只有一个Job,但它有两个JobParameter对象:一个以JobParameters01-01-2017开始,另一个以参数01-02-2017开始。因此,它们之间的联系可以定义为:JobInstance = Job 标识JobParameters。这允许开发人员有效地控制JobInstance的定义方式,因为它们控制传入的参数
(4)JobExecution
代表这个Job执行的一些信息,比如Job执行的上下文,Job的创建时间和结束时间。
(5)Step
关于Step,你可以这么理解,一个任务就是一个Job,但是你的任务是由许多步骤组成的,在每个步骤里面会做一些逻辑处理,比如从数据源读取数据、对读取的数据进行清洗转换、最后将干净的数据写入目标数据源,这里的步骤就是Step,所以一个Job可以由一个Step或者多个Step组成。
(6)StepExecution
对应Step,它也有对应的StepExecution,它里面包含了一个Job执行的每个Step的执行上下文等信息。
3、实战
关于SpringBatch的批处理,大概处理流程如下图所示:
下面介绍一个多数据源数据的批处理例子
场景:存在多个数据源,需要将多个数据源的数据抽取同步到单个数据源,其中还需要在同步的过程中对多个数据源的数据进行抽取、清洗、转换等操作,把最终的结果写入单个数据源
1、Job和Step定义
代码语言:javascript复制package com.batch.example.demo.job;
import com.batch.example.demo.pojo.FundProduct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 李勇勇
* @version 1.0
* @date 2021-05-24 09:54
*/
@Configuration
@Slf4j
public class FundProductSyncJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private ItemReader<FundProduct> readFundProduct;
@Autowired
private ItemWriter<FundProduct> writeProduct;
@Autowired
private ItemProcessor<FundProduct, FundProduct> productProcessor;
@Bean
public Job productSyncJob() {
return jobBuilderFactory.get("productSyncJob")
.start(readFundProductStep())
.build();
}
@Bean
public Step readFundProductStep() {
return stepBuilderFactory.get("readFundProductStep")
.<FundProduct, FundProduct>chunk(1000)
.reader(readFundProduct)
.processor(productProcessor)
.writer(writeProduct)
.build();
}
}
2、Reader定义
代码语言:javascript复制package com.batch.example.demo.read;
import com.batch.example.demo.pojo.FundProduct;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.OraclePagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
/**
* @author 李勇勇
* @version 1.0
* @date 2021-05-24 09:55
*/
@Configuration
@Slf4j
public class FundProductReader {
@Autowired
@Qualifier("chDataSource")
private DataSource chDataSource;
@Autowired
@Qualifier("chJdbcTemplate")
private JdbcTemplate chJdbcTemplate;
@Bean
public JdbcPagingItemReader<FundProduct> readFundProduct() {
JdbcPagingItemReader<FundProduct> itemReader = new JdbcPagingItemReader<>();
itemReader.setDataSource(chDataSource);
itemReader.setFetchSize(1000);
itemReader.setRowMapper((rs, rowNum) -> {
FundProduct product = new FundProduct();
product.setProductCode(rs.getString(2));
product.setProductName(rs.getString(3));
product.setProductType(rs.getString(4));
product.setFundManagerCode(rs.getString(5));
product.setFundTrusteeCode(rs.getString(6));
return product;
});
OraclePagingQueryProvider queryProvider = new OraclePagingQueryProvider();
queryProvider.setSelectClause("select ID,FSYMBOL,FDSNAME,FDTYPE,KEEPERCODE,TRUSTEECODE");
queryProvider.setFromClause(" from TQ_FD_BASICINFO ");
Map<String, Order> sortedMap = new HashMap<>();
sortedMap.put("ID", Order.DESCENDING);
queryProvider.setSortKeys(sortedMap);
itemReader.setQueryProvider(queryProvider);
return itemReader;
}
}
3、Processor定义
代码语言:javascript复制package com.batch.example.demo.processor;
import com.batch.example.demo.pojo.FundProduct;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
/**
* @author 李勇勇
* @version 1.0
* @date 2021-05-24 09:54
*/
@Component
@Slf4j
public class FundProductProcessor implements ItemProcessor<FundProduct, FundProduct> {
@Override
public FundProduct process(FundProduct fundProduct) throws Exception {
FundProduct product = new FundProduct();
// 基金代码标准化
product.setId(UUID.randomUUID().toString().replace("-", ""));
product.setProductCode("standard" fundProduct.getProductCode());
product.setFundTrusteeCode(fundProduct.getFundTrusteeCode());
product.setFundManagerCode(fundProduct.getFundManagerCode());
product.setProductType(fundProduct.getProductType());
product.setProductName(fundProduct.getProductName());
return product;
}
}
4、Writer定义
代码语言:javascript复制package com.batch.example.demo.write;
import com.batch.example.demo.pojo.FundProduct;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 李勇勇
* @version 1.0
* @date 2021-05-24 09:55
*/
@Configuration
@Slf4j
public class FundProductWriter {
@Autowired
@Qualifier("primaryDataSource")
private DataSource primaryDataSource;
@Bean
public JdbcBatchItemWriter<FundProduct> writeProduct() {
JdbcBatchItemWriter<FundProduct> batchItemWriter = new JdbcBatchItemWriter<>();
batchItemWriter.setDataSource(primaryDataSource);
batchItemWriter.setSql(" INSERT INTO product (id, product_code, product_name, product_type, "
"fund_manager_code, fund_trustee_code) VALUES (:id, :productCode, :productName, "
":productType, :fundManagerCode, :fundTrusteeCode) ");
batchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return batchItemWriter;
}
}
5、多数据源定义
代码语言:javascript复制package com.batch.example.demo.config;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.JdbcTemplate;
/**
* @author 李勇勇
* @version 1.0
* @date 2021-05-24 09:06
*/
@Slf4j
@Configuration
public class DynamicDataSourceConfig {
@Bean
@Primary
@ConfigurationProperties("spring.datasource.primary")
public DataSourceProperties primaryDataSourceProperties() {
return new DataSourceProperties();
}
@Bean("chDataSourceProperties")
@ConfigurationProperties("spring.datasource.ch")
public DataSourceProperties chDataSourceProperties() {
return new DataSourceProperties();
}
@Bean
@Primary
public DataSource primaryDataSource(@Autowired DataSourceProperties props) {
return props.initializeDataSourceBuilder().build();
}
@Bean("chDataSource")
public DataSource chDataSource(@Autowired @Qualifier("chDataSourceProperties") DataSourceProperties props) {
return props.initializeDataSourceBuilder().build();
}
@Bean
@Primary
public JdbcTemplate primaryJdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean(name = "chJdbcTemplate")
public JdbcTemplate chJdbcTemplate(@Qualifier("chDataSource") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
}
6、配置信息
代码语言:javascript复制# 多数据源配置
# target postgresql database
spring.datasource.primary.driver-class-name=org.postgresql.Driver
spring.datasource.primary.url=jdbc:postgresql://host:port/dbname
spring.datasource.primary.username=
spring.datasource.primary.password=
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-drop-postgresql.sql
spring.batch.jdbc.initialize-schema=always
spring.datasource.primary.type=com.alibaba.druid.pool.DruidDataSource
# 财汇 oracle
spring.datasource.ch.url=jdbc:oracle:thin:@ip:port:instance
spring.datasource.ch.username=
spring.datasource.ch.password=
spring.datasource.ch.driver-class-name=oracle.jdbc.OracleDriver
spring.datasource.ch.type=com.alibaba.druid.pool.DruidDataSource
# wind oracle
spring.datasource.wind.url=jdbc:oracle:thin:@ip:port:instance
spring.datasource.wind.username=
spring.datasource.wind.password=
spring.datasource.wind.driver-class-name=oracle.jdbc.OracleDriver
spring.datasource.wind.type=com.alibaba.druid.pool.DruidDataSource
7、建表语句
代码语言:javascript复制create table product
(
id varchar(32) not null
constraint product_pkey
primary key,
product_code varchar(128),
product_name varchar(1024),
product_type varchar(128),
fund_manager_code varchar(128),
fund_trustee_code varchar(128)
);
comment on table product is '产品表';
comment on column product.id is '产品id';
comment on column product.product_code is '基金代码';
comment on column product.product_name is '基金简称';
comment on column product.product_type is '基金类型';
comment on column product.fund_manager_code is '基金管理人code';
comment on column product.fund_trustee_code is '基金托管人code';
alter table product
owner to product_dev3;
9、数据对象定义
代码语言:javascript复制package com.batch.example.demo.pojo;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* @author 李勇勇
* @version 1.0
* @date 2021-05-24 09:51
*/
@Getter
@Setter
@ToString
public class FundProduct {
private String id;
/**
* 基金代码
*/
private String productCode;
/**
* 基金简称
*/
private String productName;
/**
* 基金类型
*/
private String productType;
/**
* 基金管理人代码
*/
private String fundManagerCode;
/**
* 基金托管人代码
*/
private String fundTrusteeCode;
}
这篇文章主要是对SpringBatch中的其余概念做补充,并写了一个springbatch对于多数据源数据抽取转换的一个demo,供大家参考。