Spring Batch实战(二)

2022-07-12 14:27:01 浏览数 (1)

1、SpringBatch支持三种类型的数据源来进行批处理:

  • 数据库类型
  • 文件类型
  • 消息类型

2、概念术语

(1)Job

在Spring Batch中,Job只是Step实例的容器。它将在逻辑上属于一个流的多个步骤组合在一起,并允许配置所有步骤的全局属性,比如可重新启动性。作业配置包含:

  • Job的名称
  • Step实例的定义和顺序
  • 任务是否可以重新启动
代码语言:javascript复制
@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}

如上图所示,我们使用JobBuilderFactory来创建了一个Job,任务的名字叫footballJob,开始第一个Step是playerLoad(),第二个Step是gameLoad(),第三个Step是playerSummarization()。

(2)JobInstance

它的含义就是一个Job对应了一个批处理的Job实例。

(3)JobParameters

JobParameters对象保存一组用于启动批处理作业的参数。它们可以用于识别,甚至在运行过程中作为参考数据,如下图所示:

在前面的示例中,有两个实例,一个用于1月1日,另一个用于1月2日,实际上只有一个Job,但它有两个JobParameter对象:一个以JobParameters01-01-2017开始,另一个以参数01-02-2017开始。因此,它们之间的联系可以定义为:JobInstance = Job 标识JobParameters。这允许开发人员有效地控制JobInstance的定义方式,因为它们控制传入的参数

(4)JobExecution

代表这个Job执行的一些信息,比如Job执行的上下文,Job的创建时间和结束时间。

(5)Step

关于Step,你可以这么理解,一个任务就是一个Job,但是你的任务是由许多步骤组成的,在每个步骤里面会做一些逻辑处理,比如从数据源读取数据、对读取的数据进行清洗转换、最后将干净的数据写入目标数据源,这里的步骤就是Step,所以一个Job可以由一个Step或者多个Step组成。

(6)StepExecution

对应Step,它也有对应的StepExecution,它里面包含了一个Job执行的每个Step的执行上下文等信息。

3、实战

关于SpringBatch的批处理,大概处理流程如下图所示:

下面介绍一个多数据源数据的批处理例子

场景:存在多个数据源,需要将多个数据源的数据抽取同步到单个数据源,其中还需要在同步的过程中对多个数据源的数据进行抽取、清洗、转换等操作,把最终的结果写入单个数据源

1、Job和Step定义

代码语言:javascript复制
package com.batch.example.demo.job;

import com.batch.example.demo.pojo.FundProduct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:54
 */
@Configuration
@Slf4j
public class FundProductSyncJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ItemReader<FundProduct> readFundProduct;
    @Autowired
    private ItemWriter<FundProduct> writeProduct;
    @Autowired
    private ItemProcessor<FundProduct, FundProduct> productProcessor;

    @Bean
    public Job productSyncJob() {
        return jobBuilderFactory.get("productSyncJob")
                .start(readFundProductStep())
                .build();
    }

    @Bean
    public Step readFundProductStep() {
        return stepBuilderFactory.get("readFundProductStep")
                .<FundProduct, FundProduct>chunk(1000)
                .reader(readFundProduct)
                .processor(productProcessor)
                .writer(writeProduct)
                .build();
    }
}

2、Reader定义

代码语言:javascript复制
package com.batch.example.demo.read;

import com.batch.example.demo.pojo.FundProduct;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.OraclePagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:55
 */
@Configuration
@Slf4j
public class FundProductReader {

    @Autowired
    @Qualifier("chDataSource")
    private DataSource chDataSource;

    @Autowired
    @Qualifier("chJdbcTemplate")
    private JdbcTemplate chJdbcTemplate;

    @Bean
    public JdbcPagingItemReader<FundProduct> readFundProduct() {
        JdbcPagingItemReader<FundProduct> itemReader = new JdbcPagingItemReader<>();
        itemReader.setDataSource(chDataSource);
        itemReader.setFetchSize(1000);
        itemReader.setRowMapper((rs, rowNum) -> {
            FundProduct product = new FundProduct();
            product.setProductCode(rs.getString(2));
            product.setProductName(rs.getString(3));
            product.setProductType(rs.getString(4));
            product.setFundManagerCode(rs.getString(5));
            product.setFundTrusteeCode(rs.getString(6));
            return product;
        });
        OraclePagingQueryProvider queryProvider = new OraclePagingQueryProvider();
        queryProvider.setSelectClause("select ID,FSYMBOL,FDSNAME,FDTYPE,KEEPERCODE,TRUSTEECODE");
        queryProvider.setFromClause(" from TQ_FD_BASICINFO  ");
        Map<String, Order> sortedMap = new HashMap<>();
        sortedMap.put("ID", Order.DESCENDING);
        queryProvider.setSortKeys(sortedMap);
        itemReader.setQueryProvider(queryProvider);
        return itemReader;
    }
}

3、Processor定义

代码语言:javascript复制
package com.batch.example.demo.processor;

import com.batch.example.demo.pojo.FundProduct;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:54
 */
@Component
@Slf4j
public class FundProductProcessor implements ItemProcessor<FundProduct, FundProduct> {

    @Override
    public FundProduct process(FundProduct fundProduct) throws Exception {
        FundProduct product = new FundProduct();
        // 基金代码标准化
        product.setId(UUID.randomUUID().toString().replace("-", ""));
        product.setProductCode("standard"   fundProduct.getProductCode());
        product.setFundTrusteeCode(fundProduct.getFundTrusteeCode());
        product.setFundManagerCode(fundProduct.getFundManagerCode());
        product.setProductType(fundProduct.getProductType());
        product.setProductName(fundProduct.getProductName());
        return product;
    }
}

4、Writer定义

代码语言:javascript复制
package com.batch.example.demo.write;

import com.batch.example.demo.pojo.FundProduct;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:55
 */
@Configuration
@Slf4j
public class FundProductWriter {

    @Autowired
    @Qualifier("primaryDataSource")
    private DataSource primaryDataSource;

    @Bean
    public JdbcBatchItemWriter<FundProduct> writeProduct() {
        JdbcBatchItemWriter<FundProduct> batchItemWriter = new JdbcBatchItemWriter<>();
        batchItemWriter.setDataSource(primaryDataSource);
        batchItemWriter.setSql(" INSERT INTO product (id, product_code, product_name, product_type, "
                  "fund_manager_code, fund_trustee_code) VALUES (:id, :productCode, :productName, "
                  ":productType, :fundManagerCode, :fundTrusteeCode) ");
        batchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        return batchItemWriter;
    }
}

5、多数据源定义

代码语言:javascript复制
package com.batch.example.demo.config;

import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.JdbcTemplate;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:06
 */
@Slf4j
@Configuration
public class DynamicDataSourceConfig {

    @Bean
    @Primary
    @ConfigurationProperties("spring.datasource.primary")
    public DataSourceProperties primaryDataSourceProperties() {
        return new DataSourceProperties();
    }

    @Bean("chDataSourceProperties")
    @ConfigurationProperties("spring.datasource.ch")
    public DataSourceProperties chDataSourceProperties() {
        return new DataSourceProperties();
    }

    @Bean
    @Primary
    public DataSource primaryDataSource(@Autowired DataSourceProperties props) {
        return props.initializeDataSourceBuilder().build();
    }

    @Bean("chDataSource")
    public DataSource chDataSource(@Autowired @Qualifier("chDataSourceProperties") DataSourceProperties props) {
        return props.initializeDataSourceBuilder().build();
    }

    @Bean
    @Primary
    public JdbcTemplate primaryJdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Bean(name = "chJdbcTemplate")
    public JdbcTemplate chJdbcTemplate(@Qualifier("chDataSource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }
}

6、配置信息

代码语言:javascript复制
# 多数据源配置
# target postgresql database
spring.datasource.primary.driver-class-name=org.postgresql.Driver
spring.datasource.primary.url=jdbc:postgresql://host:port/dbname
spring.datasource.primary.username=
spring.datasource.primary.password=
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-drop-postgresql.sql
spring.batch.jdbc.initialize-schema=always
spring.datasource.primary.type=com.alibaba.druid.pool.DruidDataSource
# 财汇 oracle
spring.datasource.ch.url=jdbc:oracle:thin:@ip:port:instance
spring.datasource.ch.username=
spring.datasource.ch.password=
spring.datasource.ch.driver-class-name=oracle.jdbc.OracleDriver
spring.datasource.ch.type=com.alibaba.druid.pool.DruidDataSource
# wind oracle
spring.datasource.wind.url=jdbc:oracle:thin:@ip:port:instance
spring.datasource.wind.username=
spring.datasource.wind.password=
spring.datasource.wind.driver-class-name=oracle.jdbc.OracleDriver
spring.datasource.wind.type=com.alibaba.druid.pool.DruidDataSource

7、建表语句

代码语言:javascript复制
create table product
(
    id                varchar(32) not null
        constraint product_pkey
            primary key,
    product_code      varchar(128),
    product_name      varchar(1024),
    product_type      varchar(128),
    fund_manager_code varchar(128),
    fund_trustee_code varchar(128)
);

comment on table product is '产品表';

comment on column product.id is '产品id';

comment on column product.product_code is '基金代码';

comment on column product.product_name is '基金简称';

comment on column product.product_type is '基金类型';

comment on column product.fund_manager_code is '基金管理人code';

comment on column product.fund_trustee_code is '基金托管人code';

alter table product
    owner to product_dev3;

9、数据对象定义

代码语言:javascript复制
package com.batch.example.demo.pojo;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

/**
 * @author 李勇勇
 * @version 1.0
 * @date 2021-05-24 09:51
 */
@Getter
@Setter
@ToString
public class FundProduct {


    private String id;

    /**
     * 基金代码
     */
    private String productCode;

    /**
     * 基金简称
     */
    private String productName;
    /**
     * 基金类型
     */
    private String productType;
    /**
     * 基金管理人代码
     */
    private String fundManagerCode;
    /**
     * 基金托管人代码
     */
    private String fundTrusteeCode;


}

这篇文章主要是对SpringBatch中的其余概念做补充,并写了一个springbatch对于多数据源数据抽取转换的一个demo,供大家参考。

0 人点赞