MuleSoft — Decoding Batch Processing

Process it all, execute it in batches

Arun Dutta
The Mule Blog
13 min readJun 8, 2020

--

Perhaps the most powerful core component in Mule, the Batch component has been there in Mule since a long time. The rationale behind having this processor is to process large amount of data in segregated batches so that failure is minimized and accounted for, and can be re-tried later if required. We will see in this article how all this happens (based on Batch component for Mule Runtime 4.x), but let’s first try to understand the underlying to principle of batch processing.

P.S: I will mainly highlight the the core concepts and structural building blocks of Batch, for configuration related information you can refer the MuleSoft documentations in Reference section.

Batch Process — Concept

Processing data in batches is not a new concept in software industry, especially when it comes to ETL (Extract Transform Load) where data is processed in stages and quite often in fixed size chunks. Even in other sector industries and factories, batch processing of materials is of prime essence. Lets’s take example of any manufacturing sector where raw materials get converted into finished products. There are two ways they can be produced

  • Mass Processing — All the materials are processed together sequentially on a conveyor belt.
  • Batch Processing — The raw materials are segregated into fix size chunks or batches and processed in parallel on conveyor belt.

While being processed, the materials go through various mechanical parts where they converted from one form to another and finally the last stage creates the actual product and gets packaged.

Batch (upper) — Each process bulk sized items vs Mass production (lower) — Each stage processing one item at a time

It has been found that processing in batches is more efficient as it takes less time (considering concurrent nature and setup time of each machine parts processing the materials), and also it gives flexibility reject/re-process a particular batch. Also, be it mass production or batch, the entire manufacturing process may also involve multiple stages, and there is often some amount of waiting time to move batches of materials from one stage to another for various production related factors.

Batch operations in Mule

Now that we understand what is batching, let us see how batch is implemented in Mule runtime. Before jumping on the Batch component module, let me highlight that there is one more component or rather a scope in Mule which also supports batching — For Each.

For Each Component

The For Each scope of Mule is used to run a loop over an array of data elements (collections), and by default it processes 1 element (which is also the default batch size) at a time, but Mule allows the batch size to be configured using settings parameter as shown below.

With the above configuration we will be able to achieve a very simplified batching operation, wherein if a collection of 80 elements comes in the data will be split into 8 batches each containing 10 elements each, and then the loop would run for 8 times over each 10-element batches, and process all the 10 elements as one single mule message.

One caveat with the above batching technique is that the batches are only processed serially, one after the other.

The Batch Component

As I mentioned before, the batch component is literally the ‘fall to’ processor when it comes to processing large chunks of data in an ETL fashion; infact batch component allows the Mule to act as an ETL (Mule is widely regarded as an ESB tool and there are subtle differences between ETL and ESB, see reference).

The Batch component is available on Mule EE editions only.

The above picture shows a logical structure of the batch component.The component has the ability to split large chunks of messages into records that are processed asynchronously in a batch job. A batch job typically consists of number batch steps which will wrap the processors processing the business logic over the records.

At a high level, the batch component typically assesses the data a creates batch job instances with fixed size data chunks (batch sizes) which are then passed on the to the batch steps. When the batch job instance becomes executable, the runtime engine submits a task for each record in the batch, the records are placed in an in-memory queue waiting to be executed asynchronously across the processors.

Barebone skeletal structure of Batch component
<flow name="flowOne">
<batch:job jobName="batchJob">
<batch:process-records>
<batch:step name="step1">
<event processor/>
<event processor/>
</batch:step>
<batch:step name="step2">
<event processor/>
<event processor/>
</batch:step>
</batch:process-records>
</batch:job>
</flow>

A batch processing setup has primarily three important logical stages — Load & Dispatch, Process & Complete.

Logical stages of a batch job

Let us now try to understand in details how each stages play a crucial role in processing data and enables processing of large chunks of data. We will analyze the entire process by taking a simple scenario, where 1000 records are fetched from a Salesforce and loaded into a database in batches.

Load & Dispatch — This is the starting implicit stage which is executed under the hood by the runtime. The data is retrieved from SFDC Customer object and is transformed from its serialized form to a collection of records and then assigned to a Batch Job. When a batch job is created, it is assigned a unique instance id which is in form of a UUID, but can be overwritten (Job Instance Id attribute) into an user-defined format by using expressions and can be retrieved using variable batchJobInstanceId.

Configuring the flow to retrieve 1000 customer records from SFDC and process in batches

The above diagram shows a basic configuration of a batch job; the data pulled from SFDC will be processed into batches and batch job instances will be created. The default batch size or the number of records assigned to a batch block is set to 100, which also can be overwritten (Batch block size attribute). For the above defined use-case, since the batch block size is 100, and number of records coming in is 1000, there will be 10 batch job instances created in memory each containing 100 records. Below diagram logically depicts the Load & Dispatch phase.

Load & Dispatch phase creating multiple batch job instances each containing 100-record blocks.

For each batch job instance, the records are landed on an in-memory queue ready to processed in the batch steps. Mule runtime will ensure that all the records are placed properly in queue, failing which it will fail the entire message during this phase.

The records are kept in queue for batch job instances

Process — In this phase, the records in a batch instance are processed asynchronously by the Mule runtime. This phase consists of different batch steps and the records stored on the queue get processed by each of the processors across all the steps concurrently and are placed back on the queue.

Simplified Mule flow showing processors in different batch steps

The batch job instance will ensure that all the records are processed by all the batch steps before being placed back on the queue, each record internally keeps track of the steps/stages it has been processed through while it sits on this queue. In the above example, the first step does some tranformation on the incoming salesforce records and step 2 inserts the record in SQL database.

As mentioned processing in this stage is performed asynchronously, if out of the 100 records in the block lets say record number 1 is stuck in dataweave transformation in step 1 and record number 2 has finished completing the transformation and placed back on the queue, the batch engine will may pick up record number 2 and get it processed in batch step 2 while record 1 is still being processed in step number 1. Thus the order of execution of the records is not guaranteed within batch job instance process phase.

The queues used by batch job operations are persistent (on the server disk), if the batch processing is interrupted due to runtime issues, it will resume from where it stopped when the runtime recovers.

Concurrent execution of records may make them out of order in a batch job

By default, Mule persists a list of all records as they succeed or fail to process through each batch step. If a record should fail to be processed by an event processor in a batch step, the runtime continues processing the batch, skipping over the failed record in each subsequent batch step. But this behavior can be overwritten by configuring accept expression and accept policies in the batch step.

The batch component has a attribute Max Failed Records, which if set to a certain value will stop the execution. It should be set to -1 for no-limit.

As you can see, the default nature of the batch step is to reject processing of failed records from previous step, but it can be tweaked to accept all and even only failed records. The condition of acceptance or failure can be specified using dataweave expressions in accept expression section. For eg: #[payload.isEnabled == ‘true’] check can be made to ensure that the batch step only accept records where customer records coming in from SFDC system are enabled. More details can be found here.

At the end of this phase, the batch job instance completes and, therefore, ceases to exist.

On Complete — This is the last phase and is mainly for reporting purpose of the batch job, wherein it encapsulates the output the result of the entire job in a batch job result object called BatchJobResult. The report will show all the records across all the batch job instances which have been executed, and there status as to whether they have succeeded or failed. Below is sample error log.

************************************************************************************************************************ *             - - + Exception Type + - -             *         - - + Step + - -        *       - - + Count + - -       * ************************************************************************************************************************ * com.mulesoft.mule.runtime.module.batch.exception.B *            batchStep1           *                10             * * com.mulesoft.mule.runtime.module.batch.exception.B *            batchStep2           *                9              * ************************************************************************************************************************

More on error handling and batch job reporting can be found here.

A note on variable propagation, the older versions of Mule 3.x had special type of variable called recordVars for batch, but Mule 4.x does not have any such variables. The variables created in batch step are record specific, and the runtime ensures that variables created/modified by one record is not visible to other records. Even the on complete phase do not have any access the variables of records.

Performance Tuning in Batch Jobs

Now that we have gone through all the building blocks let us take a look some scenarios and ways we can try to tune the batch jobs so that we can get more out of it.

Threading Profile — In previous versions of Mule 3.x, the batch operation ran with default 16 threads which could have been further tuned and customized by tweaking the threading profile for the flow. But such capabilities are no more available in Mule 4.x. As per documentation, the runtime automatically figures out how many threads to allocate for the entire batch flow execution. When a job instance becomes executable, the batch engine submits a task for each record block to the I/O thread pool to process each record. Concurrency occurs automatically, at the record block level.

For the example that we worked on previously (SFDC retrieving data and loading onto Database), the runtime may decide to use 10 threads (the number depends on the size of the records and CPU/memory resources available) and each of those threads will be allocated to 100-record blocks to create 10 batch job instances which are ready to be executed in any fixed order (or random order depending upon certain configurations)

The above scenario is one of the possibilities, but it is upto Mule runtime to decide how many threads to assign to the job instances.

Concurrency Control — This is one of the parameters upon the above mentioned thread management strategy is decided by the runtime.The batch job provides two ways via which we can control the nature in which batch instances will be executed in. We saw earlier that Load & Dispatch stage is responsible for creating for batch job instances in the first stage. Once they are created, the instances remain in ready state and by default the batch job instances are executed in a sequential manner. That means for the 10 batch job instances (refer example discussed earlier in the article), the 100-record blocks will be executed on after the other sequentially, but, the records inside the batch job instances will be processed asynchronously in batch steps as discussed earlier. Below diagram shows the way to configure this by selecting relevant Batch Scheduling Strategies.

  • Ordered Sequential : This is the default setting. As the name suggests, if several job instances are in an executable state at the same time, the instances execute one at a time (sequentially) based on their creation timestamp. This is useful when order of execution of the batch job instances are important.
  • Round Robin: This setting allows the runtime to try execute all the batch job instances concurrently in round robin manner. The benefit of this strategy is it leads to maximum throughput wherein all the threads are concurrently executed to process all the batch instances. But, if order of execution of batch instances is important then this strategy may create chaos, especially in scenarios of data synchronization. In our example, it is not an issue as we are inserting records to database fetched from SFDC, so order of insertion is not a factor. Having said that, the processing on individual records in batch job instance do not have any guarantee to be processed in order as discussed previously. The Max Concurrency flag can be used in conjunction with this strategy and it will allow you to control how many parallel job instances should be running. If left blank, the default is twice the number of vCores/CPU cores available.

Block Size Tweak — We talked about batch job instances and batch sizes, which has a default value of 100. Mule gives us the flexibility to tweak this as we deem fit by overriding the Batch Block Size attribute as shown below.

It is worth noting that batch component flows runs on only one instance of runtime, be it on-premise cluster or CloudHub, hence the base system/VM where the runtime on which the Mule is running should have enough memory/CPU to execute the business flow.

Depending upon the number of threads allocated and the amount of concurrency required the heap memory size requirements will vary. Mule recommends sticking to the default value of 100, which should be good under most of the scenarios, but there may be situations when we may require to modify it to enhance performance and throughput.

Let’s say the working heap memory available is 8GB. If the number of batch job instances allocated by the runtime is 10 and the batch size is kept 100, then at most 1000 records can be processed in parallel across batch job instances (considering Round Robin Scheduling Strategy with max concurrency overridden to 10). With the amount of memory available, the maximum possible record size sustainable will be approximately 8MB (( 8 X 1024 ) /(100 X 10) ).

Based upon the concurrency and Scheduling Strategy the core runtime decides how many threads to allocate for the entire Batch Job.

If lets say, the record sizes are small (~800 KB), we can leverage the given resources mentioned above by kicking up the batch size to 1000 ( (8 X 1000 X 1000) / ( 800 X 10) ), allowing more throughput and higher levels of parallelism (in case of ROUND ROBIN scheduling strategy) . As discussed earlier, parallel processing of batch instances may not be always a boon, so it is imperative that we access the requirement before configuring the batch component.

Batch Job Instance assignments in concurrent scenario, processing 10 parallel instances.

Aggregation — Each batch step has a section called aggregation which allows us to load/upsert data in bulk to databases or external datasources.

Aggreagator set to value of 50

The above scenario will wait for 50 customer records to be accumulated which will the then be inserted to database using bulk insert SQL component. The batch aggregator is mutable,the records can be modified and accessed sequentially or randomly (using ForEach component and the records variable)

The records variable discussed above is not the same as recordVars which was available in Mule 3.x

The batch aggregator also gives an option to stream the data instead of fixed sized values, but the records cannot be accessed randomly in this mode (records are not mutable during streaming). Streaming should be used carefully, especially when loading data to external SaaS providers like SFDC/NetSuite.

Batch streaming may actually slow your transactions down depending upon the consumer destination bandwidth, so as a developer we need to access all the parameters while deciding between fixed-size and streaming. Batch aggregators does not support job-instance-wide transactions. We can define a transaction inside a batch step that processes each record in a separate transaction.

Mule 4.1/4.2 Batch processing had issues wherein the original mimeTypes of the payloads were not preserved. This is fixed in the latest Mule 4.3 release, by passing an attribute preserveMimeTypes in the batch aggregator as shown below.

<batch:aggregator size="50" preserveMimeTypes="true">
<db:bulk-insert doc:name="Bulk Insert 50 records to database"/></batch:aggregator>

That’s it for now. You can find out more on the configuration side using links in the references section. I will come back with more implementation examples and real world scenarios involving batch.

--

--

Arun Dutta
The Mule Blog

3x Microsoft Certified (Azure Solutions Architect)| 2x MuleSoft Certified (Integration Architect/Developer)| Java/Microservices/NodeJS Certified