Parallelising bulk DB queries with Spring batch : Part 2

Anmolpreet singh Brar
5 min readAug 6, 2021

--

Since for our problem of retrieving millions of rows in parallel and having fall back patterns on top of it, we couldn’t go with different approaches of SQL retrieval and wanted to have an out of box solution that can support these features and hence, Spring Batch.

Spring batch approach

Parallelising the DB operations

For parallelising the retrieval of rows from database in most optimal way in which we didn’t have to manage the offsets-limits and thread executions, we considered using Spring Batch Partitioner.

Basically what Spring batch partitioner does is that it can divide the spring batch job into multiple slave jobs which have their own context. These individual contexts can be used to pass on the start and limit ids for SQL operations to executed by ItemReader.

FallBack Patterns

We had to consider an approach that can provide us with options of restarting a failed SQL operation, which can happen do to any reason such as unavailability of SQL connection. We also wanted to restart the operation from the exact point which it failed so that we don’t reprocess the previous records again. Spring-batch provides all these options of restarting the whole jobs and individual partitions from last point of failure. Also, we can configure different routes for retries based on exception types as well. For example, not retrying or skipping records in the condition of Poison Pill.

Optimal Db Queries

Here we would be using out of box DB-fetchers(JdbcPagingItemReader) provided by the SpringBatch which internally fetch the data based pre-defined pageSize and using the ID of last retrieved record from previous page of a partition to fetch next page of that partition, which is faster than basic Offset and Limit queries

Approach Followed

  1. Create master step that uses the Spring batch partitioner to create slave steps.
  2. Use spring batch partitioner (ColumnRangePartitioner) to create slave job based on ids of data to be retrieved :-
  • call database and get the min and max id for a given query condition (in our case it was to get the minimum and maximum id with value for a specific column).
  • based on min and max ids, create ranges for the partition. For ex if min is 1 and max is 9 and we are using grid size as 3, the partitions will be 1–3, 3–6 and 6–9.

Once Slave steps are created, they will function in this way :-

  1. ItemReader :- read the data from database based on ranges passed by the partitioner. For ex retrieve the data among id 1 to 10.
  2. ItemProcessor:- process the data
  3. ItemWriter:- It works as out-sink for the job. In our case, it was publishing the data retrieved to Kafka (this can be done via KafkaWritter provided by the SpringBatch or you can write your custom one as well).

Some Code reference :-

Custom Spring Batch Partitioner

/**
*
@author abrar This class represents partitioner used to divide a springbatch job into multiple
* partitions based on range of min and max ids for a given where clause. Number of partitions
* will be equal to size of grid
*/
@Setter
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
private String table;
private String column;
private String whereCondition;

public void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}

@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<>();
int size =
jdbcTemplate.queryForObject(
"SELECT COUNT(" + column + ") FROM " + table + " where " + whereCondition,
Integer.class);
// if the size is zero in DB, we skip the job with single partition
if (size <= 0) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + 0, value);
value.putInt("minValue", 0);
value.putInt("maxValue", 0);
return result;
}
int min =
jdbcTemplate.queryForObject(
"SELECT MIN(" + column + ") FROM " + table + " where " + whereCondition, Integer.class);

int max =
jdbcTemplate.queryForObject(
"SELECT MAX(" + column + ") FROM " + table + " where " + whereCondition, Integer.class);

int targetSize = (max - min) / gridSize + 1;

int number = 0;
int start = min;
int end = start + targetSize - 1;

while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);

if (end >= max) {
end = max;
}

value.putInt("minValue", start);
value.putInt("maxValue", end);

start += targetSize;
end += targetSize;

number++;
}
return result;
}
}

JobConfig class

@Configuration
@EnableBatchProcessing
public class JobConfiguration extends DefaultBatchConfigurer {
@Autowired private JobBuilderFactory jobBuilderFactory;

@Autowired private StepBuilderFactory stepBuilderFactory;

@Autowired
@Qualifier("readerDataSource")
private DataSource dataSource;

@Value("${test.chunkSize:1000}")
private int chunkSize;

@Value("${test.gridSize:10}")
private int gridSize;

@Autowired BackOffPolicy batchBackOffPolicy;
@Autowired RetryPolicy batchRetryPolicy;

@Bean
@StepScope
public ColumnRangePartitioner partitioner(
@Value("#{jobParameters['tableName']}") String tableName,
@Value("#{jobParameters['columnName']}") String columnName,
@Value("#{jobParameters['whereCondition']}") String whereCondition) {
ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
columnRangePartitioner.setColumn(columnName);
columnRangePartitioner.setDataSource(dataSource);
columnRangePartitioner.setWhereCondition(whereCondition);
columnRangePartitioner.setTable(tableName);
return columnRangePartitioner;
}

// Master step
@Bean
public Step step1( ItemReader<Edge> pagingItemReader,
ItemProcessor<Edge> itemProcessor,ItemWriter<Edge> customWriter,
ColumnRangePartitioner partitioner,
TaskExecutor asyncTaskExecutor) {
return stepBuilderFactory
.get("step1")
.partitioner(slaveStep(pagingItemReader, itemProcessor,customWriter).getName(), partitioner)
.step(slaveStep(pagingItemReader, edgeBatchDeleteKafkaWriter))
.gridSize(gridSize)
.taskExecutor(asyncTaskExecutor)
.build();
}

// slave step
@Bean
public Step slaveStep(
ItemReader<Edge> pagingItemReader, ItemProcessor<Edge> itemProcessor ,ItemWriter<Edge> customKafkaWriter) {
return stepBuilderFactory
.get("slaveStep")
.<Edge, Edge>chunk(chunkSize)
.reader(pagingItemReader).processor(itemProcessor). .writer(customKafkaWriter)
// .faultTolerant()
// .retryPolicy(batchRetryPolicy)
// .backOffPolicy(batchBackOffPolicy)
.build();
}

@Bean
public Job sampleJob(Step step1) {
return jobBuilderFactory
.get("jobName")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}
}

Launching the Job

//parameters to be passed which are read by spring-batch
JobParameters jobParameters =
new JobParametersBuilder()
.addString(
"JobId",
String.valueOf(System.currentTimeMillis()))
.addString(
"whereCondition",
"(name in ('xyz')")
.addString(
"tableName", "table_123")
.addString("columnName", "id")
.addDate("date", new Date())
.addLong("time", System.currentTimeMillis())
.toJobParameters();
//launching the job
JobExecution jobExecution = jobLauncher.run(job, jobParameters);

Spring Batch Tables:-

StepExecutions are created per partition, each partition executed per thread and state for each partition is preserved for easy restart

Pros

The spring batch approach can be considered due:-

  1. Added advantage of restart mechanism.
  2. Better segregation of responsibilities among steps for batch process such as each individual step for database retrieval (ItemReader), data modelling and processing (ItemProcessor) and publishing the kafka events(ItemWriter).
  3. Better handling for parallel approach of retrieval and publishing data using Spring Batch Partitioner model. (Ref :-High Performance Batch Processing Skip to 37 min) . In this approach only failed partition will be considered for restart.

Further Future improvements

we can do further improvement in this use case by moving to remote partitioning:-

  1. Use multiple slave steps distributed across multiple JVMs instead of using single JVM.
  2. each master ↔︎ slave step can be connected through queue (which can be Kafka as well). Ref :- https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning

--

--