Parallel Processing in Spring Batch

Murat Derman
5 min readOct 10, 2021

Hi everyone, in the first article I’ve done an intro to spring batch you can reach to the article from here

Today I will mention about parallel processing . I will give you information about Multi-threaded Step,Parallel Steps and Partitioning a Step

Multi-Threaded Step

We can simply use a taskExecutor in order to create an asychronous task execution.

Mostly SimpleAsyncTaskExecutor or ThreadPoolTaskExecutor is used.

You can see the example below

@Bean(name="myTaskExecutor")
public TaskExecutor taskExecutor(){

return new SimpleAsyncTaskExecutor("customerInfo");
}
@Bean
public
Step step1(@Qualifier("myTaskExecutor")TaskExecutor taskExecutor) {
return stepBuilderFactory.get("step1")
.chunk(2)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.retryLimit(3)
.retry(Exception.class)
.taskExecutor(taskExecutor)
.throttleLimit(5)
.build();
}

You can limit the thread invocation by setting attribute throtteLimit.It shows the maximum number of thread running at one time

In multi-threaded step you can’t guarantee the order of the records Since every task executor thread process chunked size of items concurrently

Let’s look at this with an example We have a file with 10 records.

Ali;Durak;test@testmail.com;000-000
Ahmet;Topcu;test2@testmail.com;111-111
Mehmet;Darı;test3@testmail.com;222-222
Ozlem;Dertsiz;test4@testmail.com;333-333
Nuray;Ozden;test5@testmail.com;444-444
Murat;Günok;test6@testmail.com;555-555
Arif;Atay;test7@testmail.com;666-666
Ayse;Güvenir;test8@testmail.com;777-777
Fatma;Deniz;test9@testmail.com;888-888
Mert;Fırıncı;test10@testmail.com;999-999

we have an ,itemReader configuraiton like below

@Bean
public
FlatFileItemReader<CustomerInformation> reader() {
return new FlatFileItemReaderBuilder<CustomerInformation>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited().delimiter(";")
.names(new String[]{"name", "surName","emailAddress","phoneNumber"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<CustomerInformation>() {{
setTargetType(CustomerInformation.class);
}})
.build();
}

We use a customeItemProcessor for the itemProcessor like below

*/
@Slf4j
@Service
public class
CustomerInformationItemProcessor implements ItemProcessor<CustomerInformation,CustomerInformation> {

@Override
public
CustomerInformation process(CustomerInformation customerInformation) throws Exception {
log.info("CustomerInformationItemProcessor customer Name {} - Surname {} - PhoneNumber {} - EmailAddress {}",
customerInformation.getName(),customerInformation.getSurName(),customerInformation.getPhoneNumber(),customerInformation.getEmailAddress());
return customerInformation;
}
}

After we run our application you can see that the records in the file is processed unordered.Every thread holds chunk size of items

Parallel Steps

If the logic of the application has to work in parallel you can split the responsibilities to parallelized steps.

Lets look at the example below

Let’s say we have two type of files.One of them holds customer information with the fields customerNo,name,surName,emailAddress,phoneNumber Other file consist of customer_invoices with the fields customer_no,invoice_no,invoice_amount,open_amount,due_date

@Bean
public
FlatFileItemReader<CustomerInformation> reader() {
return new FlatFileItemReaderBuilder<CustomerInformation>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited().delimiter(";")
.names(new String[]{"customerNo","name", "surName","emailAddress","phoneNumber"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<CustomerInformation>() {{
setTargetType(CustomerInformation.class);
}})
.build();
}



@Bean
public
FlatFileItemReader<CustomerDebtInformation> reader2() {
return new FlatFileItemReaderBuilder<CustomerDebtInformation>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("sample-data-invoice.csv"))
.delimited().delimiter(";")
.names(new String[]{"customerNo","invoiceNo", "invoiceAmount","openAmount","dueDate"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<CustomerDebtInformation>() {{
setTargetType(CustomerDebtInformation.class);
}})
.build();
}

We have two Flow items.One of them run step1 and step2.The other one run step3

@Bean
public
Step step1() {
return stepBuilderFactory.get("step1")
.chunk(2)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.retryLimit(3)
.retry(Exception.class)
.taskExecutor(taskExecutor())
.throttleLimit(5)
.build();
}
@Bean
public
Step step2() {
return this.stepBuilderFactory.get("step2")
.tasklet(myTasklet())
.build();
}
@Bean
public
Step step3() {
return stepBuilderFactory.get("step3")
.chunk(5)
.reader(reader2())
.processor(processor2())
.writer(writer2())
.faultTolerant()
.retryLimit(3)
.retry(Exception.class)
.taskExecutor(taskExecutor())
.throttleLimit(5)
.build();
}

@Bean
public
Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}

@Bean
public
Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}

We have one more flow which is responsible to run the other flows in parallel with taskExecutor.

@Bean
public
TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor taskExecutor=new SimpleAsyncTaskExecutor("customerInfoThreads-");
return taskExecutor;
}
@Bean
public
Flow parallelFlows() {
return new FlowBuilder<SimpleFlow>("parallelFlows")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}

We have two custom ItemProcessor implementations CustomerInformationProcessor and CustomerDebtInformationProcessor as follows

@Slf4j
@Service
public class
CustomerInformationItemProcessor implements ItemProcessor<CustomerInformation,CustomerInformation> {

@Override
public
CustomerInformation process(CustomerInformation customerInformation) throws Exception {
log.info("Thread id is worked with {},CustomerInformationItemProcessor customer No {} - customer Name {} - Surname {} - PhoneNumber {} - EmailAddress {}",
Thread.currentThread().getName(),customerInformation.getCustomerNo(),customerInformation.getName(),customerInformation.getSurName(),customerInformation.getPhoneNumber(),customerInformation.getEmailAddress());
return customerInformation;
}
}
@Slf4j
@Service
public class
CustomerDeptInformationProcessor implements ItemProcessor<CustomerDebtInformation,CustomerDebtInformation> {
@Override
public
CustomerDebtInformation process(CustomerDebtInformation customerDebtInformation) throws Exception {
log.info("Thread id is worked with {},CustomerDeptInformationProcessor customer No {} - invoiceNo {} - InvoiceAmount {} - OpenAMount {} - DueDate {}",
Thread.currentThread().getName(),customerDebtInformation.getCustomerNo(),customerDebtInformation.getInvoiceNo()
,customerDebtInformation.getInvoiceAmount()
,customerDebtInformation.getOpenAmount()
,customerDebtInformation.getDueDate());
return customerDebtInformation;
}
}

Here is the job configuration

@Bean
public
Job processJob() {
return jobBuilderFactory.get("processJob")
.incrementer(new RunIdIncrementer())
.listener(jobExecutionListener)
.start(parallelFlows())
.end()
.build();
}

When we run our application flow1 and flow2 run conccurently.Flow1 reads sample-data.csv file and insert its contents to the customer_information table.Flow2 reads sample-data-invoice.csv and insert its contents to the customer_invoice table

You can see the result below

CUSTOMER_INFORMATION

CUSTOMER_INVOICE

Partitioning a Step

Spring Batch allows us to partition a step.You can see the overview figure below

Here is an example usage.Let’s say we have 5 files which consist of two records. So 5 partition threads will read files.Each one only read one file and write data in parallel to db

We have our FlatFileItemReader like below.

@StepScope
@Bean
public
FlatFileItemReader<CustomerInformation> reader(@Value("#{stepExecutionContext[fileName]}") String fileName) throws MalformedURLException {
return new FlatFileItemReaderBuilder<CustomerInformation>()
.name("FlatFileItemReader")
.resource(new UrlResource(fileName))
.delimited()
.names(new String[]{"customerNo","name", "surName","emailAddress","phoneNumber"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<CustomerInformation>() {{
setTargetType(CustomerInformation.class);
}})
.build();
}

We have our step like below

@Bean
public
Step step1() throws MalformedURLException {
return stepBuilderFactory.get("step1")
.<CustomerInformation,CustomerInformation>chunk(2)
.reader(reader(null))
.processor(processor())
.writer(writer())
.build();
}

We have a stepManage step which manage a step with the partitioner and partitionHandler

By the help of TaskExecutorPartitionHandler which is the implementation of PartitionHandler, we set some attributes of a partitioner such as taskExecutor,step info and gridSize .

MultiResourcePartitioner is used to process the multiple csv files with the partitioner

@Bean
public
Partitioner partitioner() throws IOException {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setResources(resourcePatternResolver.getResources(" file:D:/dev/springBatchExample/sample-data*.csv"));
return partitioner;
}
@Bean
public
PartitionHandler partitionHandler() throws MalformedURLException {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(5);
return retVal;
}
@Bean
public
Step step1Manager() throws Exception {
return stepBuilderFactory.get("step1.manager")
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public
Job processJob() throws Exception {
return jobBuilderFactory.get("processJob")
.incrementer(new RunIdIncrementer())
.listener(jobExecutionListener)
.start(step1Manager())
.build();
}

So here is the result.Our job processed both five files in parallel in one execution

References

--

--