今天这篇文章,我们来了解一下SpringBatch的ItemReaders、ItemWriters、ItemStream以及怎么注册一个Step。前一篇文章我分析了一下怎么去从database中load数据使用ItemReader的一个子类JdbcPageQueryProvider,今天就进一步分析一下读取数据库数据源时的两个关键类ItemReader和ItemStream,以及写入数据库时的ItemWriter。
1、ItemReader
对于ItemReader,大家应该有个整体的认识,就是它是将许多不同的数据源数据来进行读取,然后使用ItemProcessor或者ItemWriter来写入到目标数据库或者NoSQL中,其中我们最长读取的三种类型就是:
(1)Flat文件:
怎么来理解这个Flat文件,我的个人理解是ItemReader是可以从该文件中按照行模式来读取数据,该文件要么就是有固定的格式,比如字段名,下面是value或者是每个记录的不同内容之间采用逗号之类的做隔离,比如下面两张图就是Flat File的两种类型:
带有字段定义的txt文件
这种是不带字段定义的txt文件。
(2)XML文件:
XML ItemReaders处理XML独立于用于解析、映射和验证对象的技术。输入数据允许根据XSD模式验证XML文件,具体例子见下图:
(3)关系型数据库:
访问数据库资源以返回可映射到对象进行处理的ResultSet。默认的SQL ItemReader实现调用一个RowMapper来返回对象。
下面是ItemReader的接口定义:
代码语言:javascript复制public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
该接口只提供一个read方法,定义了ItemReader最基本的方法。调用它将返回一个数据项,如果没有剩余数据项则返回null。一个数据项可以表示文件中的一行、数据库中的一行或XML文件中的一个元素。通常期望这些被映射到一个领域对象(例如Trade、Foo或其他)。
如果底层资源是事务性的(例如JMS队列),那么在回滚场景中的后续调用中调用read方法可能返回相同的逻辑项。值得注意的是,缺少要由ItemReader处理的项并不会导致抛出异常。例如,配置了返回0结果的查询的数据库ItemReader在第一次调用read时返回null。
2、ItemWriter
ItemWriter在功能上类似于ItemReader,但具有反向操作。资源仍然需要定位、打开和关闭,但它们的不同之处在于ItemWriter是写入,而不是读取。对于数据库或队列,这些操作可能是插入、更新或发送。输出序列化的格式特定于每个批处理作业。
如下所示,是ItemWriter接口的定义:
代码语言:javascript复制public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
ItemWriter是来把从ItemReader中读取的数据来批量写入目标数据源,SpringBatch的设计是希望批量写入,这里需要提一下chunk,它里面是可以设置每一批处理多少条记录的。
3、ItemStream
itemreader和itemwriter都很好地满足了各自的目的,但它们之间存在一个共同的问题,需要另一个接口。通常,作为批处理作业范围的一部分,需要打开和关闭读取器和写入器,并需要一种持久化状态的机制。ItemStream接口实现了这个目的,如下面的例子所示:
代码语言:javascript复制public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
实现了ItemStream的ItemReader的客户端应该在任何读取调用之前调用open,以打开任何资源(如文件)或获取连接。类似的限制也适用于实现ItemStream的ItemWriter。如果在ExecutionContext中找到了预期的数据,那么可以使用它在初始状态以外的位置启动ItemReader或ItemWriter。相反,调用close是为了确保在open期间分配的任何资源都被安全释放。调用update主要是为了确保当前保持的任何状态都被加载到提供的ExecutionContext中。在提交之前调用此方法,以确保在提交之前将当前状态持久化到数据库中。
4、如果数据源是文件类型,txt、csv,xml中之类,用SpringBatch又来怎么处理?
demo-SpringBatch从txt文件读取内容:
代码语言:javascript复制package com.example.springbatch.file;
import com.example.springbatch.item.writer.TxtFileWriter;
import com.example.springbatch.pojo.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
@Configuration
public class TxtFileReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private TxtFileWriter txtFileWriter;
@Bean
public Job txtFileReadJob(){
// 创建一个Job
return jobBuilderFactory.get("txtFileReadJob")
.start(txtFileReadStep())
.build();
}
@Bean
public Step txtFileReadStep() {
// 创建一个Step,每一批次处理的数据个数为4条记录
return stepBuilderFactory.get("txtFileReadStep")
.<Product,Product>chunk(4)
.reader(txtFileRead())
.writer(txtFileWriter)
.build();
}
@Bean
public FlatFileItemReader<Product> txtFileRead() {
// 此处使用FlatFileItemReader来读取txt文件内容
FlatFileItemReader<Product> flatFileItemReader = new FlatFileItemReader<>();
ClassPathResource resource = new ClassPathResource("product.txt");
flatFileItemReader.setResource(resource);
// 此处之所以使用setLinesToSkip,主要作用是我们的txt文件内容第一行事标题
// 我们只需要从第一行数据开始读取即可
flatFileItemReader.setLinesToSkip(1);
// 下面的内容就是解析txt文件内容
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
delimitedLineTokenizer.setNames(new String[]{"id","productName","productCode","productCompany"});
DefaultLineMapper<Product> defaultLineMapper = new DefaultLineMapper<>();
defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
defaultLineMapper.setFieldSetMapper(fieldSet -> {
Product product = new Product();
product.setId(fieldSet.readLong("id"));
product.setProductName(fieldSet.readString("productName"));
product.setProductCode(fieldSet.readString("productCode"));
product.setProductCompany(fieldSet.readString("productCompany"));
return product;
});
defaultLineMapper.afterPropertiesSet();
flatFileItemReader.setLineMapper(defaultLineMapper);
return flatFileItemReader;
}
}
代码语言:javascript复制package com.example.springbatch.item.writer;
import com.example.springbatch.pojo.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class TxtFileWriter implements ItemWriter<Product> {
@Override
public void write(List<? extends Product> items) throws Exception {
// 此处的writer主要是将reader的结果打印出来
items.forEach(p->log.info("基金产品信息:{}",p));
}
}
Demo-SpringBatch将内容写入txt文件
代码语言:javascript复制package com.example.springbatch.file;
import com.example.springbatch.pojo.Product;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class WriteDataToFile {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private ItemReader<Product> readDataBaseData;
@Autowired
private ItemWriter<Product> writeToTxtFile;
@Bean
public Job writeDataToFileJob(){
return jobBuilderFactory.get("writeDataToFileJob")
.start(writeDataToFileStep())
.build();
}
@Bean
public Step writeDataToFileStep() {
return stepBuilderFactory.get("writeDataToFileStep")
.<Product,Product>chunk(5)
.reader(readDataBaseData)
.writer(writeToTxtFile)
.build();
}
}
下面是writeToTxtFile的实现:
代码语言:javascript复制package com.example.springbatch.config;
import com.example.springbatch.pojo.Product;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
@Configuration
@Slf4j
public class WriteToTxtFileConfig {
@Bean
public FlatFileItemWriter<Product> writeToTxtFile() throws Exception {
FlatFileItemWriter<Product> writer = new FlatFileItemWriter<>();
String filePath = "/Users/yongyongli/reading/msad/product.txt";
writer.setResource(new FileSystemResource(filePath));
writer.setLineAggregator(item -> {
ObjectMapper objectMapper = new ObjectMapper();
String result = null;
try {
result = objectMapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
log.error("异常",e);
}
return result;
});
writer.afterPropertiesSet();
return writer;
}
}
Demo-SpringBatch从XML文件读取内容
代码语言:javascript复制@Bean
public StaxEventItemReader<Product> xmlFileRead() {
StaxEventItemReader<Product> staxEventItemReader =new StaxEventItemReader<>();
staxEventItemReader.setResource(new ClassPathResource("product.xml"));
staxEventItemReader.setFragmentRootElementName("product");
XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
Map<String, Class> aliases = new HashMap<>();
aliases.put("product",Product.class);
xStreamMarshaller.setAliases(aliases);
staxEventItemReader.setUnmarshaller(xStreamMarshaller);
return staxEventItemReader;
}
Demo-SpringBatch将内容写入XML文件
代码语言:javascript复制@Bean
public StaxEventItemWriter<Product> writeDataToXml() throws Exception {
StaxEventItemWriter<Product> writer = new StaxEventItemWriter<>();
XStreamMarshaller xStreamMarshaller = new XStreamMarshaller() ;
Map<String,Class> aliases = new HashMap<>();
aliases.put("product",Product.class);
xStreamMarshaller.setAliases(aliases);
writer.setRootTagName("products");
writer.setMarshaller(xStreamMarshaller);
String filePath = "/Users/yongyongli/reading/msad/product01.xml";
writer.setResource(new FileSystemResource(filePath));
writer.afterPropertiesSet();
return writer;
}
今天就主要分享了SpringBatch中的ItemReader、ItemWriter、ItemStream,并在结尾附加了SpringBatch的一些实战案例,从txt文件、xml等数据源读取数据,把数据写入到xml或者txt文件中去,希望对大家了解SpringBatch有所帮助,如果文章存在错误之处,还请指正。