ItemProcessor
在开发过程中,我们经常需要读取数据后,经过一系列业务逻辑的操作,进而写入数据到指定持久化过程。Spring Batch为我们提供了ItemProcessor接口进行数据处理。
1.ItemProcessor:spring-batch中数据处理的过程
2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等
3.Spring-batch为我们提供ItemProcessor<I,O>这个接口,传入一个类型I,然后由Processor处理成为O
代码语言:javascript复制public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
我们可以构建CompositeItemProcessor 的Bean, 在一个Step中可以使用多个Processor来按照顺序处理业务。 我们可以构建CompositeItemProcessor 的Bean,存储多个Processor,再与Step绑定;
代码语言:javascript复制 @Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Autowired
private ItemProcessor<Customer, Customer> firstNameUpperCaseProcessor;
@Autowired
private ItemProcessor<Customer, Customer> idFilterProcessor;
@Bean
public Job processorDemoJob() throws Exception {
return jobBuilderFactory.get("processorDemoJob")
.start(processorDemoStep())
.build();
}
@Bean
public Step processorDemoStep() throws Exception {
return stepBuilderFactory.get("processorDemoStep")
.<Customer,Customer>chunk(100)
.reader(dbJdbcDemoReader())
.processor(processorDemoProcessor())
.writer(flatFileDemoFlatFileWriter())
.build();
}
@Bean
public CompositeItemProcessor<Customer,Customer> processorDemoProcessor(){
CompositeItemProcessor<Customer,Customer> processor = new CompositeItemProcessor<>();
// 多种处理方式
List<ItemProcessor<Customer,Customer>> list = new ArrayList<>();
list.add(firstNameUpperCaseProcessor);
list.add(idFilterProcessor);
processor.setDelegates(list);
return processor;
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> dbJdbcDemoReader() {
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(100);
reader.setRowMapper((rs,rowNum)->{
return Customer.builder().id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate"))
.build();
});
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from Customer");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public FlatFileItemWriter<Customer> flatFileDemoFlatFileWriter() throws Exception {
FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();
String path = File.createTempFile("customerInfo",".data").getAbsolutePath();
System.out.println(">> file is created in: " path);
itemWriter.setResource(new FileSystemResource(path));
itemWriter.setLineAggregator(new MyCustomerLineAggregator());
itemWriter.afterPropertiesSet();
return itemWriter;
}
具体ItemProcessor实现
代码语言:javascript复制@Component
public class FirstNameUpperCaseProcessor implements ItemProcessor<Customer,Customer> {
@Override
public Customer process(Customer item) throws Exception {
// FirstName转大写
return new Customer(item.getId(),item.getFirstName().toUpperCase(),item.getLastName(),
item.getBirthdate());
}
}
@Component
public class IdFilterProcessor implements ItemProcessor<Customer,Customer> {
@Override
public Customer process(Customer item) throws Exception {
// 过滤id为基数的数据
if (item.getId() % 2 == 0){
return item;
} else {
return null;
}
}
}
输出结果
参考:
https://blog.csdn.net/wuzhiwei549/article/details/88622281