Spring Batch Unleashed: Transforming Data Processing Workflows

Krishna Murthy P Mirajkar
Engineering @ Upstox
13 min readFeb 2, 2024

Introduction

In the fintech sector, effective data management is pivotal, typically approached through either event handling or batch processing. The latter, batch processing, is often the method of choice for its adeptness at managing interconnected data and providing comprehensive insights. This article delves into Spring Batch, spotlighting its prowess in parallel processing. We explore how Upstox leverages Spring Batch’s abstractions to streamline core business operations and minimize dependencies. Moreover, we unveil strategies and tools that simplify batch data processing, elevating performance to new heights.

Prerequisite Knowledge

This discussion presumes a baseline understanding of the Spring Batch Framework. While we aim to keep our examination focused, detailed explorations of some concepts have been omitted for brevity. However, we warmly invite your queries and reflections in the comments section below, fostering a rich exchange of ideas.

High-Level Spring Batch Parallel Processing Architecture

The below diagram lays out the high-level architecture of a pure Spring Batch parallel processing system.

High-level representation of Spring Batch parallel processing

In the configuration depicted above, the Manager System typically operates within a single JVM instance, with the N-workers running as separate JVM instances across various machines. These components communicate through a Messaging System, such as ActiveMQ, facilitating seamless interaction. The Manager primarily serves to prepare and store data, while the Workers are tasked with executing complex algorithms in parallel. This setup proves highly efficient for processing vast volumes of transaction records on a large scale.

Typical Spring batch job implementations

The diagram illustrates the versatility and complexity of domain jobs, showcasing a range of forms and solutions that often intertwine with library dependencies. Such intricacy, while powerful, can introduce challenges in maintenance over time.

Our Batch Processing Framework

Recognizing the complexities and recurring motifs inherent in managing numerous batch processes, Upstox embarked on an innovative path. Our initial foray into implementing these processes via the conventional Spring Batch methodology revealed a pattern of recurring abstractions and patterns. Inspired by these insights, we crafted a uniform set of interfaces that mirrored these recurrent themes.

Subsequently, we developed an interpreter for these abstractions, capable of dynamically generating Spring Batch jobs upon system startup. These jobs are readily executable through the Spring Batch Job Launcher, tailored to the specific requirements of each job. This innovation culminated in the creation of a Batch Processing Framework library, which now underpins the vast majority of batch processes at Upstox.

Design Principle

Embracing abstractions over concrete dependencies unveils the inherent order and elegance of a well-designed system — Donald Knuth

In software development, reliance on a specific library often leads to implementations that are tightly coupled with that library, risking entanglement of business logic with library-specific dependencies. o circumvent this, we embraced a pivotal design principle at Upstox -

Dependency On Abstractions — Implementations should not depend on concrete components but should instead rely on abstractions.

Adopting this philosophy, we introduced the concept of DataService abstractions, establishing a structured hierarchy that facilitates the creation of more sophisticated abstractions. This includes data partitioners that support various strategies and connectors that facilitate communication from Manager to Worker. As a result, our business logic became library-agnostic, promoting a uniform set of interfaces that developers must implement to define a job. This approach not only simplifies the development process but also enhances the scalability and maintainability of our systems.

Decoupling and standard structure of job implementation

In this setup, the Spring Batch is specifically linked with the Batch Processing Framework. The domain code is kept separate, maintaining a clean and straightforward structure for job implementation. This approach ensures simplicity and order in the overall system design.

Abstractions

Below is the abstraction defined in the Batch Process Framework -

Class diagram of abstractions

A job definition provider only needs to implement a set of DataService(s) and wrap them in their execution order defined in the AbstractBatchJobConfiguration.

The data services are defined as below -

SingleDataService
- Represents any simple abstract data being fetched and stored

ListDataService
-
Represents a list of records of a specific type fetched and stored

AbstractPartitionInfoDataService
-
Defines the partition size to be used by the job

AbstractPartitionInfoWithRangeDataService
-
Defines the partition size using the primary key range

AbstractPartitionedListDataService
-
Defines abstraction to fetch the partition data based on the info provided by AbstractPartitionInfoDataService

AbstractCountBasedPartitionListDataService
-
Provides similar capabilities as AbstractPartitionedListDataService with additional abstraction to process post-partition created

AbstractPrimarKeyRangeCountBasedPartitionListDataService
-
Defines abstraction to fetch data based on information provided by AbstractPartitionInfoWithRangeDataService

PartitionDataService
-
Represents the manager to worker communication details

CleanupDataService
-
The clean-up to execute when the job has been completed or failed.

AbstractWorkerConfiguration
-
The worker abstraction is defined as the counterpart of PartitionDataService where partitioned data is processed

A PartitionDataService is used to communicate with the worker VMs and it encapsulates the details regarding the messaging input and output channels.

A worker must implement the WorkerConfiguration, aligning it with the PartitionDataService. The input and output channels are defined in a vice-versa manner.

Once the PartitionDataService gets executed in a job instance, the Spring Batch makes calls to the connected workers with one partition data at a time till all the partitions are processed as defined by the AbstractPartitionInfoDataService

Partitioning Strategies

Navigating through the complexities of large dataset management, the utilization of a primary key for data segmentation and individual processing emerges as a fundamental strategy. Our framework introduces a range of versatile partitioning strategies to support this need effectively. Among these, two strategies stand out:

  1. Count-Based Partitioning: This strategy involves dividing the dataset based on the number of primary keys, ensuring each partition contains a specific count. This approach is highly recommended for its ability to scale horizontally, making it a robust solution for managing large datasets efficiently.
  2. Fixed-Number Partitioning: Alternatively, this strategy segments the dataset by distributing a fixed number of primary keys across each partition. While useful in certain contexts, it may not offer the same level of scalability as the count-based approach.

We strongly endorse the count-based partitioning method for its superior scalability and memory management benefits. Especially when working within known capacity limits, this strategy enhances the system’s ability to handle large volumes of data with greater efficiency and reliability.

Spring Batch Job Generation Interpreter

The below image depicts the positioning of the job generation interpreter in the whole ecosystem.

Spring batch job interpreter positioning

As you can see, the domain implementation becomes free of any library dependency and simply focuses on the problem at hand.

Deployment Diagram for Spring Batch Manager — Worker Infrastructure

The deployment consists of two deployable artifacts namely -

Manager module — This consists of the BatchJobConfiguration and its DataService(s) implementations. This guides the orderly execution of the data services(s).

Worker module — This consists of the AbstractWorkerConfiguration and processing logic on the partition it would receive from the messaging system. In principle, we ensure it has no interactions with any of the domain databases.

Common module — This consists of the shared artifacts between the Manager and Worker modules like common models and utilities.

Deployment of Manager — Worker(s) Spring Batch Application

Example

Problem definition
We will use a simplified version of the Delivery Marking Process that deals with identifying the actual delivery quantity of shares bought by the customer.

Here is a sample data that explains the process using a customer who did multiple trades on RELIANCE and TATASTEEL shares on a particular trade day -

Delivery Marking Process example

Here are the steps involved in the process -
- Gather all the trades conducted on the given day
- Organize these trades by client, creating distinct groups for each client
- Within each client group, further categorize the trades based on the shares involved
- Calculate the total quantity of shares traded separately for the buy and sell sides, then find the absolute difference between these quantities
- Sort the trades by their trade ID
- Distribute the delivery quantity to the corresponding shares of the customer, marking zero for other shares once the delivery quantity is fully utilized.

Certainly, the current task involves substantial computation, and choosing a sequential execution across all traded clients for the day would undeniably result in significant time consumption. Our approach is to explore a solution using the abstractions provided by the Batch Process Framework. This exploration will lead us to the implementation of a Spring Batch Job, encompassing the necessary components such as Manager and Workers.

Solution

The sample implementation can be found in the GitHub repository here.

The following diagram provides an overview of the proposed solution we aim to implement.

Manager — Job Configuration And Data Services
Module —
trade-dataservice

TradeProcessorConfiguration
1.
Defines the steps involved in carrying out the job.
2. Associates any listeners who would be monitoring the job

DumpUniqueClientsInTempTableDataService
1. Loads the trades
2. Identifies the unique clients who traded over the day
3. Store them on a temporary table

TradeDataPartitionInfoWithRangeDataService
1. Based on the unique customers identified define the partitioning information to be used like the batch count of each partition and their min and max index range

TradeDataCodePartitionerDataService
1. Partitions the data based on the partition info defined by the partition info provider used.
2. Store data in storage (either local or S3) based on the profile passed in the execution environment.

TradePartitionDataService
1. Provides the wiring between the manager and worker(s) like the in-flow, out-flow, request and reply channels for the job.
2. After this step, the control goes to the n-workers connected to the manager for running the Delivery Marking Process on each partition.

TradeDataOutputDataService
1. Once the worker(s) have completed the Delivery Marking Process on each partition, the control comes back to the manager at this data service.
2. Pull the processed partitions and stitch them to generate the consolidated file. Note that each partition could have been directly updated back to db in batches. Here we have simply saved it to a file.

Common Module — trade-common
Contains the common types like constants, models and shared utilities between manager and worker.

Worker — Job Configuration
Module —
trade-worker
TradeWorker
1. Facilitates the communication between the worker and manager.
2. Following the execution of the TradePartitionDataService within the manager, control is transferred to the TradeWorker.tasklet() method for processing partitions on the workers connected to the manager.
3. Retrieves the respective partition information from the job context, retrieves data from storage done on the manager, and feeds it to the DeliveryMarkingProcessor for calculating the delivery quantity. Subsequently, it stores the computed quantity back into the storage, associating it with the respective partition number it processed.

Upon completion of processing by all workers on the prepared partitions, control returns to the TradeDataOutputDataService in the manager.

Note: This is a specific instance where the framework introduces a dependency on Tasklet, a Spring Batch object within the domain space. While there may be room for improvement, we currently accept and work within this constraint. However, it’s crucial to note that the algorithm remains decoupled.

Optimisation: Since each client data is mutually exclusive you can split the data on the worker and submit it to a thread pool. One such implementation can be seen in DeliveryMarkingProcessorOptimized. You can connect this processor in TradeWorker instead of DeliveryMarkingProcessor when the client set is large and see the difference.

Configurations

ActiveMQConfiguration
Provides the ActiveMQ configuration that will be used to communicate between the Manager and Workers. The domain is expected to only supply the properties looked up here by both manager and worker.

Spring Batch Database Configuration
The domain is expected to supply this configuration class for the database configuration used by Spring Batch. The sample configuration used can be found in SampleBatchProcessDatabaseConfig.

Job Launch

Serialised Job Launch
Serialised Job execution may be needed in a few cases where the class of service hosted may perform only data-intensive processing jobs and having parallel job execution pushes the infrastructure to breaking limits. Framework provides a job input queue wherein it picks a job from the job input queue and pauses till it has finished processing. Once finished with the processing it picks up the next in the job request queue.

The SampleBatchMQConfiguration defines this job input queue configuration for serialised execution. The README.md explains how to launch a job in this case.

Open-ended Job Launch

In scenarios where strict serial execution is not mandatory, especially within a class of services capable of parallel processing and devoid of significant data intensity, the following launcher implementation comes into play. We can tailor job execution to the unique requirements of less data-intensive and parallelizable scenarios, enhancing overall system efficiency and responsiveness. The below code shows how the Spring Job created using AbstractBatchJobConfiguration can be looked up and launched.
Note: This is another instance where you will have Spring Batch dependency which only deals with launching the jobs defined as AbstractBatchJobConfiguration. All these configurations can be looked up in GenericJobConfigurationManager as shown below in the Manager.

import java.util.Map;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.stereotype.Component;
import com.upstox.process.batch.manager.GenericJobConfigurationManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
@Component
public class GenericJobLauncher {

private final JobLauncher jobLauncher;

private final GenericJobConfigurationManager genericJobConfigurationManager;

public JobResponse launchJob(ProcessRequest processRequest) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addLong("LaunchTime", System.currentTimeMillis());
try {
String processName = processRequest.getProcessName();
Job job = genericJobConfigurationManager.getJob(processName);
Map<String, Object> parameters = processRequest.getParameters();
parameters.forEach((k, v) -> jobParametersBuilder.addString(k.toString(), v.toString()));
JobExecution jobExecution = jobLauncher.run(job, jobParametersBuilder.toJobParameters());
return new JobResponse(String.valueOf(jobExecution.getJobId()));
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException | JobParametersInvalidException | IllegalArgumentException e) {
log.error("Error while launching job", e);
}
return null;
}
}

Platform

Thus far, we’ve showcased how our framework adeptly defines jobs as collections of DataService within BatchJobConfiguration, with the automatic initiation of Spring Batch Jobs at runtime. While Spring Batch inherently supports auditing and job-level monitoring, a more granular view into the myriad job requests from various domains often becomes essential.

To bridge this gap, we propose the integration of a specialized request tracking database. This database is designed to meticulously log details about each incoming job request, ensuring comprehensive visibility within the service. Furthermore, these monitoring services should be seamlessly integrated with an Orchestration service. This pivotal connection furnishes a detailed schedule and categorization of all jobs, enhancing operational oversight.

Incorporating alerting mechanisms is the final piece of the puzzle, enabling swift detection and remediation of issues as they emerge. This proactive approach ensures the robustness and reliability of the job processing framework.

The diagram below illustrates the transition towards a more sophisticated, platform-oriented approach:

System Architecture

The diagram illustrates a meticulously organized structure, where services are categorized according to the job types they manage. At the heart of this system lies the Request DB, a pivotal entity that tracks all incoming job requests, effectively acting as the central point for job initiation. It is advisable to maintain a one-to-one relationship between each service and the Request DB to ensure clarity and efficiency in job handling.

Distinctively, each service is equipped with its own Spring Batch Configuration Database. This setup allows for the autonomous management of configurations, thereby facilitating tailored handling of batch jobs within each service’s unique context.

Job listeners play a crucial role in this architecture, enabling seamless orchestration by continually updating the central Orchestration Service. This service is ingeniously integrated with alerting systems, creating a cohesive platform that oversees job orchestration and monitoring. Such an integration not only ensures timely detection and resolution of issues but also significantly enhances the modularity and efficiency of job management across the system.

Caveats:Navigating ActiveMQ Connection Limits

As our framework scales and the number of jobs increases, a notable rise in ActiveMQ connections emerges. Each Spring Batch job traditionally initiates a new connection, potentially pushing us towards the upper limits of connections allowed by our cloud service provider. While expanding our infrastructure is a conceivable remedy, it’s important to acknowledge the associated increase in costs.

To address this challenge, a shift in our strategic approach is warranted. Rather than adhering to the conventional method of establishing a separate connection for each job, we propose a more unified strategy. Envision a scenario where job configurations are amalgamated under a singular orchestrating job. This method significantly enhances the efficiency of ActiveMQ connection usage and sparks an engaging discussion for future exploration.

Adopting such a streamlined approach not only mitigates the issue of hitting connection ceilings but also paves the way for more economical and scalable system designs. This innovative solution highlights our commitment to optimizing resources while maintaining high performance and reliability.

Conclusion: Transforming Process Automation with Our Approach

Our innovative approach to batch processing has significantly accelerated the automation of numerous processes, showcasing remarkable efficiency and performance in data handling. The key advantages of this strategy include:

  • Uniform Architecture and Implementation: By adopting a consistent structure and design across all batch processes, we ensure a cohesive and streamlined workflow.
  • Ease of Developer Integration: The uniformity of code across our Batch Processing Framework facilitates a smooth transition for developers, minimizing the learning curve. This consistency proves invaluable as developers navigate and contribute to various services within the framework.
  • Independence of Domain Logic: Our design principle of decoupling business logic from the processing framework enhances flexibility. This separation not only enables the potential for code reuse but also ensures that our system remains agile, ready to adapt to future changes or even a transition away from Spring Batch.

These benefits underscore the effectiveness of our approach, driving home the value of consistency, developer efficiency, and architectural flexibility in optimizing batch processing operations.

--

--