Spring Batch实战(三)

2022-07-12 14:27:34 浏览数 (1)

今天这篇文章,我们来了解一下SpringBatch的ItemReaders、ItemWriters、ItemStream以及怎么注册一个Step。前一篇文章我分析了一下怎么去从database中load数据使用ItemReader的一个子类JdbcPageQueryProvider,今天就进一步分析一下读取数据库数据源时的两个关键类ItemReader和ItemStream,以及写入数据库时的ItemWriter。

1、ItemReader

对于ItemReader,大家应该有个整体的认识,就是它是将许多不同的数据源数据来进行读取,然后使用ItemProcessor或者ItemWriter来写入到目标数据库或者NoSQL中,其中我们最长读取的三种类型就是:

(1)Flat文件:

怎么来理解这个Flat文件,我的个人理解是ItemReader是可以从该文件中按照行模式来读取数据,该文件要么就是有固定的格式,比如字段名,下面是value或者是每个记录的不同内容之间采用逗号之类的做隔离,比如下面两张图就是Flat File的两种类型:

带有字段定义的txt文件

这种是不带字段定义的txt文件。

(2)XML文件:

XML ItemReaders处理XML独立于用于解析、映射和验证对象的技术。输入数据允许根据XSD模式验证XML文件,具体例子见下图:

(3)关系型数据库:

访问数据库资源以返回可映射到对象进行处理的ResultSet。默认的SQL ItemReader实现调用一个RowMapper来返回对象。

下面是ItemReader的接口定义:

代码语言:javascript复制
public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

该接口只提供一个read方法,定义了ItemReader最基本的方法。调用它将返回一个数据项,如果没有剩余数据项则返回null。一个数据项可以表示文件中的一行、数据库中的一行或XML文件中的一个元素。通常期望这些被映射到一个领域对象(例如Trade、Foo或其他)。

如果底层资源是事务性的(例如JMS队列),那么在回滚场景中的后续调用中调用read方法可能返回相同的逻辑项。值得注意的是,缺少要由ItemReader处理的项并不会导致抛出异常。例如,配置了返回0结果的查询的数据库ItemReader在第一次调用read时返回null。

2、ItemWriter

ItemWriter在功能上类似于ItemReader,但具有反向操作。资源仍然需要定位、打开和关闭,但它们的不同之处在于ItemWriter是写入,而不是读取。对于数据库或队列,这些操作可能是插入、更新或发送。输出序列化的格式特定于每个批处理作业。

如下所示,是ItemWriter接口的定义:

代码语言:javascript复制
public interface ItemWriter<T> {
    void write(List<? extends T> items) throws Exception;
}

ItemWriter是来把从ItemReader中读取的数据来批量写入目标数据源,SpringBatch的设计是希望批量写入,这里需要提一下chunk,它里面是可以设置每一批处理多少条记录的。

3、ItemStream

itemreader和itemwriter都很好地满足了各自的目的,但它们之间存在一个共同的问题,需要另一个接口。通常,作为批处理作业范围的一部分,需要打开和关闭读取器和写入器,并需要一种持久化状态的机制。ItemStream接口实现了这个目的,如下面的例子所示:

代码语言:javascript复制
public interface ItemStream {
    void open(ExecutionContext executionContext) throws ItemStreamException;
    void update(ExecutionContext executionContext) throws ItemStreamException;
    void close() throws ItemStreamException;
}

实现了ItemStream的ItemReader的客户端应该在任何读取调用之前调用open,以打开任何资源(如文件)或获取连接。类似的限制也适用于实现ItemStream的ItemWriter。如果在ExecutionContext中找到了预期的数据,那么可以使用它在初始状态以外的位置启动ItemReader或ItemWriter。相反,调用close是为了确保在open期间分配的任何资源都被安全释放。调用update主要是为了确保当前保持的任何状态都被加载到提供的ExecutionContext中。在提交之前调用此方法,以确保在提交之前将当前状态持久化到数据库中。

4、如果数据源是文件类型,txt、csv,xml中之类,用SpringBatch又来怎么处理

demo-SpringBatch从txt文件读取内容

代码语言:javascript复制
package com.example.springbatch.file;

import com.example.springbatch.item.writer.TxtFileWriter;
import com.example.springbatch.pojo.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
public class TxtFileReaderJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private TxtFileWriter txtFileWriter;
  
    @Bean
    public Job txtFileReadJob(){
        // 创建一个Job
        return jobBuilderFactory.get("txtFileReadJob")
                .start(txtFileReadStep())
                .build();
    }

    @Bean
    public Step txtFileReadStep() {
        // 创建一个Step,每一批次处理的数据个数为4条记录
        return stepBuilderFactory.get("txtFileReadStep")
                .<Product,Product>chunk(4)
                .reader(txtFileRead())
                .writer(txtFileWriter)
                .build();
    }

    @Bean
    public FlatFileItemReader<Product> txtFileRead() {
        // 此处使用FlatFileItemReader来读取txt文件内容
        FlatFileItemReader<Product> flatFileItemReader = new FlatFileItemReader<>();
        ClassPathResource resource = new ClassPathResource("product.txt");
        flatFileItemReader.setResource(resource);
        // 此处之所以使用setLinesToSkip,主要作用是我们的txt文件内容第一行事标题
        // 我们只需要从第一行数据开始读取即可
        flatFileItemReader.setLinesToSkip(1);
        // 下面的内容就是解析txt文件内容
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
        delimitedLineTokenizer.setNames(new String[]{"id","productName","productCode","productCompany"});
        DefaultLineMapper<Product> defaultLineMapper = new DefaultLineMapper<>();
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        defaultLineMapper.setFieldSetMapper(fieldSet -> {
            Product product = new Product();
            product.setId(fieldSet.readLong("id"));
            product.setProductName(fieldSet.readString("productName"));
            product.setProductCode(fieldSet.readString("productCode"));
            product.setProductCompany(fieldSet.readString("productCompany"));
            return product;
        });
        defaultLineMapper.afterPropertiesSet();
        flatFileItemReader.setLineMapper(defaultLineMapper);
        return flatFileItemReader;
    }
}
代码语言:javascript复制
package com.example.springbatch.item.writer;

import com.example.springbatch.pojo.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Slf4j
public class TxtFileWriter implements ItemWriter<Product> {
    @Override
    public void write(List<? extends Product> items) throws Exception {
        // 此处的writer主要是将reader的结果打印出来
        items.forEach(p->log.info("基金产品信息:{}",p));
    }
}

Demo-SpringBatch将内容写入txt文件

代码语言:javascript复制
package com.example.springbatch.file;

import com.example.springbatch.pojo.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class WriteDataToFile {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ItemReader<Product> readDataBaseData;
    @Autowired
    private ItemWriter<Product> writeToTxtFile;

    @Bean
    public Job writeDataToFileJob(){
        return jobBuilderFactory.get("writeDataToFileJob")
                .start(writeDataToFileStep())
                .build();
    }

    @Bean
    public Step writeDataToFileStep() {
        return stepBuilderFactory.get("writeDataToFileStep")
                .<Product,Product>chunk(5)
                .reader(readDataBaseData)
                .writer(writeToTxtFile)
                .build();
    }
}

下面是writeToTxtFile的实现:

代码语言:javascript复制
package com.example.springbatch.config;

import com.example.springbatch.pojo.Product;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

@Configuration
@Slf4j
public class WriteToTxtFileConfig {

    @Bean
    public FlatFileItemWriter<Product> writeToTxtFile() throws Exception {
        FlatFileItemWriter<Product> writer = new FlatFileItemWriter<>();
        String filePath = "/Users/yongyongli/reading/msad/product.txt";
        writer.setResource(new FileSystemResource(filePath));
        writer.setLineAggregator(item -> {
            ObjectMapper objectMapper = new ObjectMapper();
            String result = null;
            try {
                result = objectMapper.writeValueAsString(item);
            } catch (JsonProcessingException e) {
                log.error("异常",e);
            }
            return result;
        });
        writer.afterPropertiesSet();
        return writer;
    }
}

Demo-SpringBatch从XML文件读取内容

代码语言:javascript复制
@Bean
    public StaxEventItemReader<Product> xmlFileRead() {
        StaxEventItemReader<Product> staxEventItemReader =new StaxEventItemReader<>();
        staxEventItemReader.setResource(new ClassPathResource("product.xml"));
        staxEventItemReader.setFragmentRootElementName("product");
        XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
        Map<String, Class> aliases = new HashMap<>();
        aliases.put("product",Product.class);
        xStreamMarshaller.setAliases(aliases);
        staxEventItemReader.setUnmarshaller(xStreamMarshaller);
        return staxEventItemReader;
    }

Demo-SpringBatch将内容写入XML文件

代码语言:javascript复制
@Bean
    public StaxEventItemWriter<Product> writeDataToXml() throws Exception {
        StaxEventItemWriter<Product> writer = new StaxEventItemWriter<>();
        XStreamMarshaller xStreamMarshaller = new XStreamMarshaller() ;
        Map<String,Class> aliases = new HashMap<>();
        aliases.put("product",Product.class);
        xStreamMarshaller.setAliases(aliases);
        writer.setRootTagName("products");
        writer.setMarshaller(xStreamMarshaller);
        String filePath = "/Users/yongyongli/reading/msad/product01.xml";
        writer.setResource(new FileSystemResource(filePath));
        writer.afterPropertiesSet();
        return writer;
    }

今天就主要分享了SpringBatch中的ItemReader、ItemWriter、ItemStream,并在结尾附加了SpringBatch的一些实战案例,从txt文件、xml等数据源读取数据,把数据写入到xml或者txt文件中去,希望对大家了解SpringBatch有所帮助,如果文章存在错误之处,还请指正。

0 人点赞