Parallel Processing in Spring Batch
Hi everyone, in the first article I’ve done an intro to spring batch you can reach to the article from here
Today I will mention about parallel processing . I will give you information about Multi-threaded Step,Parallel Steps and Partitioning a Step
Multi-Threaded Step
We can simply use a taskExecutor in order to create an asychronous task execution.
Mostly SimpleAsyncTaskExecutor or ThreadPoolTaskExecutor is used.
You can see the example below
@Bean(name="myTaskExecutor")
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("customerInfo");
}@Bean
public Step step1(@Qualifier("myTaskExecutor")TaskExecutor taskExecutor) {
return stepBuilderFactory.get("step1")
.chunk(2)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.retryLimit(3)
.retry(Exception.class)
.taskExecutor(taskExecutor)
.throttleLimit(5)
.build();
}
You can limit the thread invocation by setting attribute throtteLimit.It shows the maximum number of thread running at one time
In multi-threaded step you can’t guarantee the order of the records Since every task executor thread process chunked size of items concurrently
Let’s look at this with an example We have a file with 10 records.
Ali;Durak;test@testmail.com;000-000
Ahmet;Topcu;test2@testmail.com;111-111
Mehmet;Darı;test3@testmail.com;222-222
Ozlem;Dertsiz;test4@testmail.com;333-333
Nuray;Ozden;test5@testmail.com;444-444
Murat;Günok;test6@testmail.com;555-555
Arif;Atay;test7@testmail.com;666-666
Ayse;Güvenir;test8@testmail.com;777-777
Fatma;Deniz;test9@testmail.com;888-888
Mert;Fırıncı;test10@testmail.com;999-999
we have an ,itemReader configuraiton like below
@Bean
public FlatFileItemReader<CustomerInformation> reader() {
return new FlatFileItemReaderBuilder<CustomerInformation>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited().delimiter(";")
.names(new String[]{"name", "surName","emailAddress","phoneNumber"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<CustomerInformation>() {{
setTargetType(CustomerInformation.class);
}})
.build();
}
We use a customeItemProcessor for the itemProcessor like below
*/
@Slf4j
@Service
public class CustomerInformationItemProcessor implements ItemProcessor<CustomerInformation,CustomerInformation> {
@Override
public CustomerInformation process(CustomerInformation customerInformation) throws Exception {
log.info("CustomerInformationItemProcessor customer Name {} - Surname {} - PhoneNumber {} - EmailAddress {}",
customerInformation.getName(),customerInformation.getSurName(),customerInformation.getPhoneNumber(),customerInformation.getEmailAddress());
return customerInformation;
}
}
After we run our application you can see that the records in the file is processed unordered.Every thread holds chunk size of items
Parallel Steps
If the logic of the application has to work in parallel you can split the responsibilities to parallelized steps.
Lets look at the example below
Let’s say we have two type of files.One of them holds customer information with the fields customerNo,name,surName,emailAddress,phoneNumber Other file consist of customer_invoices with the fields customer_no,invoice_no,invoice_amount,open_amount,due_date
@Bean
public FlatFileItemReader<CustomerInformation> reader() {
return new FlatFileItemReaderBuilder<CustomerInformation>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited().delimiter(";")
.names(new String[]{"customerNo","name", "surName","emailAddress","phoneNumber"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<CustomerInformation>() {{
setTargetType(CustomerInformation.class);
}})
.build();
}
@Bean
public FlatFileItemReader<CustomerDebtInformation> reader2() {
return new FlatFileItemReaderBuilder<CustomerDebtInformation>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("sample-data-invoice.csv"))
.delimited().delimiter(";")
.names(new String[]{"customerNo","invoiceNo", "invoiceAmount","openAmount","dueDate"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<CustomerDebtInformation>() {{
setTargetType(CustomerDebtInformation.class);
}})
.build();
}
We have two Flow items.One of them run step1 and step2.The other one run step3
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.chunk(2)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.retryLimit(3)
.retry(Exception.class)
.taskExecutor(taskExecutor())
.throttleLimit(5)
.build();
}@Bean
public Step step2() {
return this.stepBuilderFactory.get("step2")
.tasklet(myTasklet())
.build();
}@Bean
public Step step3() {
return stepBuilderFactory.get("step3")
.chunk(5)
.reader(reader2())
.processor(processor2())
.writer(writer2())
.faultTolerant()
.retryLimit(3)
.retry(Exception.class)
.taskExecutor(taskExecutor())
.throttleLimit(5)
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
We have one more flow which is responsible to run the other flows in parallel with taskExecutor.
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor taskExecutor=new SimpleAsyncTaskExecutor("customerInfoThreads-");
return taskExecutor;
}@Bean
public Flow parallelFlows() {
return new FlowBuilder<SimpleFlow>("parallelFlows")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
We have two custom ItemProcessor implementations CustomerInformationProcessor and CustomerDebtInformationProcessor as follows
@Slf4j
@Service
public class CustomerInformationItemProcessor implements ItemProcessor<CustomerInformation,CustomerInformation> {
@Override
public CustomerInformation process(CustomerInformation customerInformation) throws Exception {
log.info("Thread id is worked with {},CustomerInformationItemProcessor customer No {} - customer Name {} - Surname {} - PhoneNumber {} - EmailAddress {}",
Thread.currentThread().getName(),customerInformation.getCustomerNo(),customerInformation.getName(),customerInformation.getSurName(),customerInformation.getPhoneNumber(),customerInformation.getEmailAddress());
return customerInformation;
}
}
@Slf4j
@Service
public class CustomerDeptInformationProcessor implements ItemProcessor<CustomerDebtInformation,CustomerDebtInformation> {
@Override
public CustomerDebtInformation process(CustomerDebtInformation customerDebtInformation) throws Exception {
log.info("Thread id is worked with {},CustomerDeptInformationProcessor customer No {} - invoiceNo {} - InvoiceAmount {} - OpenAMount {} - DueDate {}",
Thread.currentThread().getName(),customerDebtInformation.getCustomerNo(),customerDebtInformation.getInvoiceNo()
,customerDebtInformation.getInvoiceAmount()
,customerDebtInformation.getOpenAmount()
,customerDebtInformation.getDueDate());
return customerDebtInformation;
}
}
Here is the job configuration
@Bean
public Job processJob() {
return jobBuilderFactory.get("processJob")
.incrementer(new RunIdIncrementer())
.listener(jobExecutionListener)
.start(parallelFlows())
.end()
.build();
}
When we run our application flow1 and flow2 run conccurently.Flow1 reads sample-data.csv file and insert its contents to the customer_information table.Flow2 reads sample-data-invoice.csv and insert its contents to the customer_invoice table
You can see the result below
CUSTOMER_INFORMATION
CUSTOMER_INVOICE
Partitioning a Step
Spring Batch allows us to partition a step.You can see the overview figure below
Here is an example usage.Let’s say we have 5 files which consist of two records. So 5 partition threads will read files.Each one only read one file and write data in parallel to db
We have our FlatFileItemReader like below.
@StepScope
@Bean
public FlatFileItemReader<CustomerInformation> reader(@Value("#{stepExecutionContext[fileName]}") String fileName) throws MalformedURLException {
return new FlatFileItemReaderBuilder<CustomerInformation>()
.name("FlatFileItemReader")
.resource(new UrlResource(fileName))
.delimited()
.names(new String[]{"customerNo","name", "surName","emailAddress","phoneNumber"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<CustomerInformation>() {{
setTargetType(CustomerInformation.class);
}})
.build();
}
We have our step like below
@Bean
public Step step1() throws MalformedURLException {
return stepBuilderFactory.get("step1")
.<CustomerInformation,CustomerInformation>chunk(2)
.reader(reader(null))
.processor(processor())
.writer(writer())
.build();
}
We have a stepManage step which manage a step with the partitioner and partitionHandler
By the help of TaskExecutorPartitionHandler which is the implementation of PartitionHandler, we set some attributes of a partitioner such as taskExecutor,step info and gridSize .
MultiResourcePartitioner is used to process the multiple csv files with the partitioner
@Bean
public Partitioner partitioner() throws IOException {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setResources(resourcePatternResolver.getResources(" file:D:/dev/springBatchExample/sample-data*.csv"));
return partitioner;
}@Bean
public PartitionHandler partitionHandler() throws MalformedURLException {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(5);
return retVal;
}@Bean
public Step step1Manager() throws Exception {
return stepBuilderFactory.get("step1.manager")
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}@Bean
public Job processJob() throws Exception {
return jobBuilderFactory.get("processJob")
.incrementer(new RunIdIncrementer())
.listener(jobExecutionListener)
.start(step1Manager())
.build();
}
So here is the result.Our job processed both five files in parallel in one execution