Optimizing Batch Processing Jobs with RxJS

Ravi Mehra
24 min readApr 7, 2019

--

Batch processing may not appear too exciting. Retrieve some data, do some operations on that data, repeat. They do their job silently and take minimal input params to run with no fancy UIs. But finding ways to optimize a batch processing job can be a very rewarding process. You can feel like an algorithmic genius when you squeeze out more and more efficiency, reducing the execution time from days→hours or hours→minutes.

The optimization technique is always the same: maximize concurrency without overloading our resources. Balancing the two can be a challenge. We need the right toolset and processing model to achieve it. One such candidate is RxJS.

In this article, we’ll approach a common batch processing pattern in a reactive way using RxJS. We’ll compare the reactive approach with more traditional approaches and appraise each with visuals and benchmarks. While the reactive approach will introduce more complexity, the improvements in performance and flexibility should make RxJS a compelling tool when designing your next batch jobs.

The below graph illustrates the efficiency improvements we’ll make to the sample batch processing task. We end up achieving a 5.3x speedup!

tl;dr

Observables are awesome and can be used to really ramp up efficiency of a batch processing job. The tradeoff is some additional complexity to think about the problem in a reactive way but it would be much more complex (i.e. impossible) to achieve the same level of efficiency and flexibility with other approaches.

If you’re already familiar with RxJS, I would suggest reading this article in reverse. Start looking at the code for the optimal observable approach and reverse engineer it enough to get a cursory understanding. Then work backwards through the article to figure out how the approach works in detail and why it’s better than the Promise.all() approach. If you’re not familiar with RxJS, you can do the same but it’s gonna be rough. In that case, it’s better to read through normally.

Background on Batch Processing

Some examples of batch processing jobs include:

  • Traverse an API’s paginated list of data, fetch each record’s details, and index it in ElasticSearch.
  • Fetch data from a database, aggregate with additional data, generate HTML reports, and email them out.
  • Read database and generate sitemap.xml for search crawlers.
  • Initialize a database with randomized seed data while satisfying all DB relations.

Batch jobs are initiated in one of two ways: manually by a developer or automatically by a scheduled task runner. The jobs that run manually ideally take a few seconds or, at most, a few minutes to complete. Every second counts. The developer running it is usually blocked until the task is complete. For me, the more time I expect to sit at a terminal and wait, the more likely I’ll get distracted. Focus wanes, flow is lost, and onto the internet I go. It’s easy to justify “I have to wait a couple minutes so I’ll just check my email/stocks/news”. Those two minutes easily turn into 15 or more.

For jobs that are scheduled to run automatically, a slow job isn’t as noticeable. It’s usually a s̶e̶t̶ ̶i̶t̶ ̶a̶n̶d̶ ̶f̶o̶r̶g̶e̶t̶ ̶i̶t̶ schedule it and forget it affair. However, a poorly performing one means we can’t run it as frequently. We may be limited to run it every two hours instead of every hour because it takes >60 minutes to complete. Its also possible that the job’s working dataset gets stale by the time the job gets around to processing it. If the job’s mission is to gather statistics every hour and it takes 40 minutes to complete, which hour are we really reporting on?

The good news is that almost all batch processing jobs can benefit from concurrency and usually in obvious ways. Since batch jobs loop over a dataset and perform some operations, the obvious and most dramatic improvement is always adding concurrency. However, implementing concurrency in an efficient way is not trivial.

JavaScript has many tools available to help take advantage of concurrency with asynchronous operations including Promises, ES2017 async/await, ES2018 async iteration, NodeJS streams, and third-party packages such as bluebird, scramjet, or async. One more to add to the list is a popular library used in frontend applications: RxJS. Not all of these options are equivalent and we’ll compare RxJS to a couple of them.

RxJS

RxJS has gained traction in frontend development since it’s a perfect match for that environment. Think of the end-user as an Observable that interacts with the web page and emits values over time. The user clicks a button, navigates to a page, enters text. All of these events are observed by the application in a structured way. Each event propagates through the app’s code leading to state modifications, network requests, and/or DOM alterations. Doing all this in a structured and composable way is where RxJS shines and is made possible through the plethora of RxJS observable operators made available. These operators give developers the building blocks to design their own custom processing chain.

What’s lesser known is that RxJS is also a great tool for batch processing. In this case, think of the data source (e.g. database) as an Observable that emits batches of our desired data over time. We can then use Observable operators such as mergeMap or concatMap to process the data as it’s made available, taking advantage of concurrency and flow control.

This article won’t get into details of what RxJS or Observables are or how they work. There’s just too much to cover and there’s already great articles to learn from. Instead, we’ll look at examples of batch processing jobs that use RxJS effectively and break them down to see how they work. If you’re brand new to reactive programming or RxJS, I recommend getting formally introduced by reading through The introduction to Reactive Programming you’ve been missing and Learn to combine RxJs sequences with super intuitive interactive diagrams.

Note that converting a batch job to Observables and RxJS won’t automatically improve its efficiency. It’s up to us to design a chain of observable operators that maximizes overall efficiency. This design is dependent on the task at hand. We’ll walk through a couple examples that should be applicable to many tasks.

Back-pressure

This section is more for experienced users of RxJS. Feel free to skip this section if this isn’t you.

Two of the reasons, among others, that I believe RxJS isn’t more widely adopted for batch processing is due to a) the additional complexity of learning RxJS and b) the lack of examples in handling lossless back-pressure. Back-pressure is a complex topic but at its core, it involves a data producer providing data faster than a consumer can receive and process it. Think of a running faucet with a half-clogged drain. Water from the faucet (data) will slowly be drained (processed) but eventually the sink (heap) will be backed up and overflow.

Batch processing jobs often work with giant data sets that need to be handled in a controlled manner (in batches/pages/chunks at a time). If we don’t, the job will fail due to:

  • Blowing up our heap size.
  • Exhausting our database connections.
  • Hitting rate limiting of an API server
  • Killing the performance of the API or database.

Backpressure management is therefore critical. RxJS offers lossy and lossless options but for batch processing jobs, lossy options are a non-starter since we can’t skip over any data.

Almost all lossless back-pressure examples in RxJS that I’ve found need to have have the entire working dataset available at the declaration of the observable, e.g. from(entireDatasetArray). This often isn’t possible for the aforementioned reasons. Instead, we need a “reactive pull” approach where we work in batches of data at a time and have control when the next batch is pulled, thereby restricting the max amount of data at any one time to the size of the batch.

The examples I provide in this article introduce a novel approach to cleanly manage back-pressure using an RxJs Subject. Put simply, after one batch of data has been processed, it will call the Subject.next() method and pull/fetch the next batch of data. This puts us in control of how much data is flowing through the stream at any time. This approach is based on the ideas from the following sources:

Example — Scenario and Setup

We need a somewhat realistic batch processing scenario to evaluate and compare the implementation options. We’ll put some constraints on it to make it more interesting and realistic. Here’s the scenario we’ll be working with:

  1. Retrieve a list of companies from a database in a paginated way (first 10, next 10, etc.). We should fetch batchSize records at a time and shouldn’t have more than maxQueueSize records loaded in memory at any one time. We also shouldn’t be querying the database more than retrieveCompaniesConcurrency at a time.
  2. For each company, retrieve the list of orders that company has placed. Assume this requires an API request and we want to avoid hitting a rate limit so we have to limit the amount of concurrent network requests made with retrieveOrdersConcurrency.
  3. Email each company a report of the collected orders it placed. Assume we have an email API service that can take emails individually but performs better when it receives multiple emails in one call. We should limit the number of emails being sent in one API request to bulkEmailCount. Also, we should prevent rate limiting restrictions similar to retrieving orders with bulkEmailConcurrency.

To demonstrate this scenario, we will mock all three asynchronous operations.

  1. retrieveCompanies(limit: number, offset: number): Promise<Company[]> Uses faker.js to generate totalCompanyCount companies. It supports limit and offset params to retrieve the data in a paginated way.
  2. retrieveCompanyOrders(company: Company): Promise<Order[]> Uses faker.js again to generate up to ORDERS_PER_COMPANY orders for each company.
  3. sendBulkEmails(emailData: Company[]): Promise<void> A no-op call since we don’t expect anything back.

For each operation, we’ll artificially generate delays using setTimeout() so we simulate real asynchronous calls. All of the delays are parameterized and can be adjusted.

For the purposes of this article, we’ll set all of the aforementioned variables to the following values:

export const defaultBatchProcessingOptions: BatchProcessingOptions = {
batchSize: 5,
maxQueueSize: 15,
retrieveCompaniesConcurrency: 1,
retrieveOrdersConcurrency: 5,
bulkEmailConcurrency: 5,
maxBulkEmailCount: 5,
};
const TOTAL_COMPANY_COUNT = 100;
const RETRIEVE_ONE_COMPANY_DELAY = 6;
const RETRIEVE_ONE_COMPANY_ORDER_DELAY = 5;
const ORDERS_PER_COMPANY = 6;
const SEND_BULK_EMAILS_DELAY = 60;

With these options set, it means:

  • retrieveCompanies() will take 30ms (batchSize * RETRIEVE_ONE_COMPANY_DELAY === 30)
  • retrieveCompanyOrders() will also take 30ms (ORDERS_PER_COMPANY * RETRIEVE_ONE_COMPANY_DELAY === 30)
  • sendBulkEmails() will take 60ms (SEND_BULK_EMAILS_DELAY).

It is very interesting to see how adjusting the delays and adding randomness affects the run-times of each approach. The results are not always linear. There are too many permutations to go through for this article but feel free to adjust the options on your own. As we’ll see later, the optimized observable approach can actually take advantage of the randomness which would be difficult to do in another approach.

Not all of these options will be used in every approach. Some are only applicable to the advanced scenarios that ramp up the concurrency and optimizations. For the simpler scenarios, we won’t violate any constraint simply because the approach is too inefficient to do so.

View the utils.ts file for the full implementation of the aforementioned asynchronous operations that generate fake data and the batch processing options.

With the util functions out of the way, the index.ts file is then our main starting point that kicks off each approach and benchmarks them. Each approach will be benchmarked 3 times and the average will be printed to console. Comment out any you aren’t interested in running.

Approach 1 — Sequential with Async/Wait

Our first approach is the most naive. It uses async/await to procedurally go through each company, retrieve the company’s orders, and send the emails one at a time. It iterates indefinitely until retrieveCompanies() returns no data back. The only parameter used in the BatchProcessingOptions is batchSize which is used to limit how many companies to retrieve in one call.

Performance: ~5313ms

This is our baseline performance that the other approaches will be compared to.

Visualization

In the diagram, we can see the three operations: retrieveCompanies(), retrieveCompanyOrders(), and sendBulkEmails(). Each horizontal tick is 10ms. Remember that retrieveCompanies() and retrieveCompanyOrders() take 30ms to complete and sendBulkEmails() takes 60ms. Each arrow therefore represents the time to complete.

The numbers above each arrow correspond to the index number(s) of the company(ies) it is processing. The 0–4 above retrieveCompanies() means that it is fetching and returning the first five companies [0–4]. The 0, 1, 2, etc. above the arrows in retrieveCompanyOrders() means it is fetching the company’s orders for company [0], [1], [2], etc, respectively.

The takeaway here is that there are no overlapping arrows. Each one starts after the last one completes. And that means there is no concurrency on any of the operations, leading to a long execution time.

Areas for improvement

One obvious bottleneck here is how it synchronously and sequentially retrieves orders for each company. If we could make those order retrievals happen concurrently, it would give a dramatic improvement.

Approach 2 — Adding concurrency with Promise.all()

Promise.all() allows us to run multiple asynchronous operations concurrently. It wraps all of the inner promises into one outer promise that won’t resolve until all of the inner ones do. This is a perfect tool for us to concurrently execute all of the retrieveCompanyOrders() for each batch of data.

Performance: ~2631ms

2.44x speedup over sequential

Visualization

When comparing this graph to the last, we can see that the arrows of retrieveCompanyOrders() now happens concurrently. In the same time window of 320ms, we are processing companies 10-14 instead of only 5–9 from the first approach.

Areas for improvement

One problem is that Promise.all() is susceptible to the weakest link. Let’s say we put ten retrieveCompanyOrders() calls in a Promise.all(). Nine of the those retrieveCompanyOrders() calls resolve in 20ms but for some reason, one of them takes 200ms (e.g. network issue). Due to the nature of Promise.all(), all ten won’t resolve until 200ms. What if we could proceed with the 9 that went fast and handle the slow one later. Hint: we could if we think of data as a stream (observables).

Another problem is that even though we’ve made retrieveCompanyOrders() concurrent, we haven’t improved the efficiency of the other two asynchronous operations: retrieveCompanies() and sendBulkEmails().

Thinking bigger, our pipeline waits until the current batch of orders is completely processed before starting on the next batch. What if we could fetch the next batch of companies before we’re finished with the first set. That way, we’re pre-loading the next batch of data to be processed as soon as the the pipeline is ready to process it. We’re then queueing data for processing. For example, if I have 20 emails ready to be sent queued up and my email API restricts me to only send 5 at a time, I could immediately run two concurrent sendBulkEmails() to send emails 0–9 and then two more after that to send emails 10–19.

For all of these improvements, we need to have some constraints or things will get out of hand. Without constraints, we’ll be too aggressive and the heap size will blow up, we’ll hit rate limiting issues, we’ll exhaust our DB connections, etc. This is where the aforementioned BatchProcessingOptions provides us a set of constraints.

To actually accomplish all these proposed improvements now gets difficult with our limited toolkit of async/await and Promise.*(). Time to introduce observables!

To summarize our improvement ideas:

  1. Avoid being limited by the weakest link that Promise.all() suffers from.
  2. Make more asynchronous tasks concurrent.
  3. Pre-load and queue data to optimize the pipeline.
  4. Restrict the concurrency and data fetching to a set of constraints.

Approach 3 — Converting approach 2 to observables

Now that we’ve set the stage on the limitations of the previous approach, let’s see what observables and RxJS can do. But before we can really flex the power of observables, let’s do a simple 1-to-1 conversion of the last approach.

The goal with this approach is not to improve efficiency or performance. Instead, this approach serves as a learning opportunity to get our feet wet with observables. We’ll also learn some interesting RxJS operators such as mergeMap() and mergeScan(). And we’ll see how to handle lossless back-pressure with paginated data in RxJS through the power of Subjects.

Thinking in Observables

To understand how this approach works, we need to first start thinking about processing a stream of data. Or the analogy I like to use is a factory assembly line.

We feed the assembly line with some raw data. As the data moves down the line, the data gets processed through multiple different processing machines. Some of these machines take time to process and can’t keep up with the speed of data getting fed in. To avoid an I Love Lucy type situation, we can pause the assembly line at any point until the processor (Lucy) is ready to accept more data (chocolates). Eventually, all raw data moves through the entire assembly line, fully processed by all the machines along the way, and the job is marked complete (until we want to feed more data in).

Translating back to observables, the assembly line is the entire observable. The processing machines are the RxJS operators such as mergeMap and mergeScan. These operators are arranged in the order as given to pipe().

Now let’s break down what the mergeMap and mergeScan operators do.

mergeMap
mergeMap is our heavy lifter here. As per the mergeMap documentation, it projects each source value to an Observable which is merged in the output Observable. What does this mean? Well for our purposes, it allows us to get data passed in, process it, and pass the modified data downstream, very similar to what Array.map() does.

But in addition to mapping input values to output values, it also supports merge/flatten which is not similar to Array.map() and is only applicable to streams. The difference here is that mergeMap can output more or less values than it receives, which will be merged into the output stream. This is why mergeMap was previously known as flatMap due to it’s flattening support.

To explain mergeMap and the idea of merging/flattening, let’s say I have an assembly line that gets initially fed with boxes of donuts. On this assembly line, I have two machines, one to unpack each box and put each individual donut on the assembly line. The second machine then takes a bite out of each donut for quality control.

const boxes$ = from([ [d1, d2, d3], [d4, d5, d6] ]);boxes$.pipe(
// Unpack each box and place each donut onto the stream.
mergeMap(box => from(box)),
// For each donut, place the output of `bite()` onto the stream.
mergeMap(async donut => {
return await bite(donut);
})
)

Note that I use mergeMap to unpack the donuts using RxJS’s from(), which creates a new observable that individually emits each object in an array. Combining from() with mergeMap, it emits each donut into the stream. Also note in the second mergeMap call, I don’t have to return an Observable from mergeMap; I can also return a Promise. When returning promises, mergeMap will wait until the promise is resolved and the resolved data will be sent downstream.

With this knowledge of mergeMap, this approach should make more sense.

// Fetch the next batch of data (the batchSize number of records after the current offset).
mergeMap(
curOffset => retrieveCompanies(options.batchSize, curOffset),
),
// Flatten the array of fetched companies into individual company records.
mergeMap(companies => {
return from(companies);
}),
// Now we work with individual companies.
mergeMap(async company => {
company.orders = await retrieveCompanyOrders(company);
return company;
}),

There’s one feature I haven’t mentioned about mergeMap and that feature is what makes mergeMap so attractive for batch processing jobs. This is the ability to run things concurrently. By default, mergeMap concurrently processes all input values it receives. If 10 values are inputted at the same time, mergeMap will process all 10 of them concurrently. We can also control how concurrent mergeMap is with the concurrent parameter. Passing a value of 5 will restrict mergeMap to only process 5 total concurrent tasks at the same time, even if more than 5 are queued up. Sometimes, we’ll want to disable concurrency and can do so by setting concurrent = 1.

mergeMap vs Promise.all
So how is mergeMap different than Promise.all() since they both process a set of asynchronous tasks concurrently? Well, one difference is mergeMap’s ability to easily control the amount of concurrency with the concurrent param. Remember we often don’t want to make everything concurrent because we might bring down a system, hit rate limiting, blow up our heap, etc. Controlling concurrency with Promise.all() is much harder; we have to break up the input array into chunks that corresponds to how much concurrency we want.

The second difference is that when mergeMap finishes one task, it can immediately send the value downstream. It doesn’t have to wait for all tasks to finish like Promise.all() does. This is possible since we are working in observables, i.e. a stream of data, instead of discrete arrays with Promise.all().

The ability to process data individually as it comes is extremely powerful when we have anomalous events like network latency that spike only a few of our asynchronous operations. With Promise.all(), each anomalous event puts the entire assembly line on pause, suffering from the weakest link. But with mergeMap, the assembly line keeps moving. This feature can give a huge boon to performance as a batch processing job’s chain of operations get longer and more complex. It effectively mitigates the weakest links.

BehaviorSubject and mergeScan

Both BehaviorSubject and mergeScan are RxJS tools that allow us to control the iteration and pagination of the batch processing job.

BehaviorSubject is a type of RxJS Subject that simply creates an observable that we can subscribe to. The interesting feature of Subject’s over raw Observables is the ability to control when new values are emitted using next() (similar to JavaScript Generators). BehaviorSubject also adds the ability, unlike a regular Subject, to provide the initial emitted value.

mergeScan is similar to the mergeMap operator but also allows us to keep a local state that we can reference and update at every execution. It has some similarities with Array.reduce() if you’re familiar with that in how the state is initialized and passed around. Similar to mergeMap, mergeScan also allows us to merge a returned Observable or Promise.

As stated, combining BehaviorSubject and mergeScan together allows us to control iteration and pagination over the data source. In the previous approaches, we did this using a simple for loop: for (let curOffset = 0; curOffset < Infinity; curOffset += options.batchSize). Now we need to do something similar in Observables. At a high level, we’ll subscribe to the BehaviorSubject as it emits new curOffset values. Every time a new curOffset value is emitted, we’ll go and fetch a batch of data at that curOffset. Initializing with BehaviorSubject(0) means the first emitted curOffset value will be 0, just like it is in the for loop. Then we make mergeScan the last operator in the stream, which will aggregate the individual results for the current batch of data. When all items from the current batch have been processed, we issue a BehaviorSubject.next() call with an incremented curOffset, which will fetch and process the next batch of data. Therefore, mergeScan is responsible for determining when the current batch is finished and when to start the next.

Note that this approach to iteration with mergeScan and waiting for all items from the current batch to be processed isn’t performant. It’s a contrived approach to demonstrate a 1-to-1 mapping between Promise.all() and Observables. We’ll see in the next approach how we can use scan to more reactively tackle iteration.

Also note that the combination of BehaviorSubject and scan / mergeScan effectively give us a novel approach to lossless back-pressure with paginated data. Read the Back-Pressure section of this article for more info.

Performance: ~2633ms

0x speedup over the Promise.all() approach. As this approach is a 1–to-1 conversion from Promise.all(), we should expect the performance to be the same.

Visualization

See the Promise.all() visualization graph as it is identical.

Areas for improvement

We haven’t accomplished any of the problems and improvement ideas that we listed in the in the Promise.all() approach. Even though we use mergeMap to add concurrency, we’ve purposely restricted it with mergeScan to emulate Promise.all() so we still suffer from the weakest link problem. Also, we still wait until the entire batch is processed before fetching the next batch. Translating to observable terms, we wait until the entire stream is drained before emitting more values into the stream.

Approach 4 — Optimizing observable concurrency

We finally get to flex the power of observables. Let’s review our improvement ideas from the Promise.all() approach:

  1. Avoid being limited by the weakest link that Promise.all() suffers from.
  2. Make more asynchronous tasks concurrent.
  3. Pre-load and queue data to optimize the pipeline.
  4. Restrict the concurrency and data fetching to a set of constraints.

mergeMap is our answer to ideas 1, 2, and 4. We already saw how mergeMap can solve idea 1 since it will immediately send individual processed tasks downstream; it doesn’t wait until all tasks complete like Promise.all() does. Idea 2 can be solved by simply wrapping more asynchronous calls in mergeMap. Idea 4 can be partially solved by the concurrent param to mergeMap, which allows us to limit how many concurrent operations are done at one time.

To solve idea 3, we need to introduce a queue concept.

Queuing

If we could fetch multiple batches of items and put them in a queue, it would avoid delays throughout the entire pipeline. Each operator in the stream wouldn’t sit around idle until the next batch of data arrives. In an ideal scenario, each operator would have the next batch of data already waiting at its doorstep before its currently operating batch is processed.

To implement a queue, we just need to take advantage of observables and feed more data into the stream before the current batch finishes. No other changes are required since the operators in the stream will automatically consume the queued data at their desired rates (see the mergeMap concurrent param).

The queue’s benefits are also compounded when we move to processing each item individually instead of in batches (see idea 1). The combination of both improvements increases the likelihood that each operator will have input data to consume.

We also need to monitor the size of the queue to make sure it is always filled up to our desired size of queueSize. If the queue is not filled up, we simply inject more records into the queue. We don’t want an unlimited queue size since then it suffers from the same problems as loading all items from the datasource upfront with no pagination.

To further explain how the queue works, let’s say we have a batchSize of 5 records and our queueSize is currently 15. If we process an entire batch, then queueSize is decremented by 5 and now becomes 10. Therefore, we detect this and advance our controller$ Subject to fetch an additional batch of 5 records, thereby maintaining the queueSize at 15.

You can think of the queue concept as a hopper or feeder in an assembly line. Its job is to feeds raw materials at the start of the assembly line. In our case, the hopper/feeder is smart in the sense that we have control on when to advance/feed more items onto the line.

Implementing Our Improvement Ideas

Alright, now we know how to solve all of our proposed ideas, let’s put it together and look at the exact improvements made in this approach when compared to the last observable approach.

  1. We introduce the queue concept that we just described, where we ensure the pipeline has enough data flowing through it to avoid idle operators. Note that there is no variable to hold the queue’s items. The observable itself keeps track of the items.
  2. We adjust the iteration logic to use the scan operator instead of mergeScan. With scan, we can simply keep a state of how many records have been processed, which is used to detect if our queue is ready for more items. This is different than mergeScan where we had to track whether the current batch was fully processed. The change here is due to the fundamental difference between the two approaches. In the previous, we worked entirely in batches and wouldn’t move on to the next batch until the previous batch completed. In this approach, the order of when items in each batch complete aren’t important. For example, we may have advanced to the 10th batch but still have some items from the 1st batch in the queue if those items are taking an abnormally long time to process.
  3. In this approach, we leverage mergeMap’s ability to emit data as it’s processed. The data emitted may not be in the same order it started in but that doesn’t matter for our task at hand. (If order did matter, concatMap would be a better operator but we wouldn’t be able to leverage concurrency as much.) Since our stream now passes around individual records instead of the entire batch as an array, we can also introduce the RxJS bufferTime operator if needed to accumulate a certain amount of records before continuing. In our case, we want to accumulate some companies so we don’t invoke sendBulkEmails() with one email at a time. bufferTime thus converts a stream of individual items into a stream of arrays with each array containing the buffered items.
  4. We enable concurrency on bulk email sending task. This was practically impossible in the previous approach due to the nature of how the data flow was structured.

Performance: ~1055ms

5.04x speedup over sequential, 2.49x speedup over Promise.all().

Visualization

The graph helps visualize our efficiency improvements. Note the cascading retrieveCompanies() calls after the first batch completes which represents how the scan operator detects the queue isn’t full and advances the iterator multiple times to inject more items into the queue. The queue is also beneficial to retrieveCompanyOrders() which also shows the cascading effects. Lastly, note there is now concurrency in the sendBulkEmails() calls.

In the same time window of 320ms, we are processing companies 30–34 instead of 10–14 from the Promise.all() approach.

Interestingly, the efficiency improvements don’t kick in until the first batch is fully processed. That is because of the way we are monitoring and managing our queue. Since that logic happens in scan and since scan is the last operator in our stream, we don’t execute scan's logic until the first batch is processed. We could workaround this but it complicates the design and isn’t worth it when we are processing thousands/millions of batches.

Introducing Randomness and Anomalies

So far, we’ve seen some nice efficiency improvements, but we haven’t yet actualized all the performance benefits of our RxJS solution. Our test setups so far have been too well-behaved, which isn’t indicative of the real world where things like network latency/timeouts, performance, and rate limiting become a thing. And that’s where the observable approach will really shine. So let’s get random! We’ll get into the how/why it is more performant in the Visualization section.

Setup

Enable USE_RANDOMNESS and USE_ANOMALIES in utils.ts. With the following settings, all the static delays will now slightly deviate within a random window, e.g. retrieving one company will now randomly be within 4–8 ms range instead of the hardcoded 6ms. Also, every 10th company will take 10x longer to retrieve its orders which will simulate anomalies.

/**
* Enable to introduce anomalies. This will multiply the delay of
* `retrieveCompanyOrders()` by `ANOMALY_MULTIPLIER` for every
* `ANOMALY_FREQUENCY` companies.
*/
const USE_ANOMALIES = true;
const ANOMALY_FREQUENCY = 10;
const ANOMALY_MULTIPLIER = 10;
// Enable to get random delays and order counts.
const USE_RANDOMNESS = true;
const RETRIEVE_ONE_COMPANY_DELAY = () =>
USE_RANDOMNESS ? random.number({ min: 4, max: 8 }) : 6;
const RETRIEVE_ONE_COMPANY_ORDER_DELAY = () =>
USE_RANDOMNESS ? random.number({ min: 3, max: 7 }) : 5;
const ORDERS_PER_COMPANY = () =>
USE_RANDOMNESS ? random.number({ min: 4, max: 8 }) : 6;
const SEND_BULK_EMAILS_DELAY = () =>
USE_RANDOMNESS ? random.number({ min: 40, max: 80 }) : 60;

Performance

We need to re-run all of the approaches with the anomalies enabled.

  1. Sequential: 7827ms
  2. Promise.all(): 5206ms
  3. Naive Observable: 5039ms
  4. Optimized Observable: 1472ms

Although this demonstrates a 5.30x improvement between Sequential and Optimized Observable approaches, the more interesting comparison is between the Promise.all() and Optimized Observable approaches. The speedup jumped from 2.49x without randomness+anomalies to a whopping 3.53x with them!

Visualizations

Promise.all()

Observable

The visualizations clearly show the Observable approach being far superior. Although it may be hard to follow, the observable approach compacts all the operations down to maximize efficiency.

A few things of note:

  • The companies at index 9 and 19 are the anomalies and take much longer to retrieve their orders. The Promise.all() approach never gets to company 19. This is due to the weakest link problem which is extremely apparent with anomalies.
  • All operations now take a random amount of time in a particular deviation range. Super interestingly, the randomness helps smooth everything out. Since the stream can consume and produce individual entities instead of working in batches, a fast processing entity and a slow one can cancel each other out. This is visualized by the cascading compactness of retrieveCompanyOrders().
  • In both approaches, we never break our desired constraints such as max five concurrent retrieveCompanyOrders() calls, one concurrent retreieveCompanies(), and a max of file emails sent in one bulk operation.
  • The observable approach shows how we queue companies 10–14 and 15–19 before we finish processing companies 5–9. This helps prevent the stream’s subsequent operations from being idle.
  • The order of companies received in sendBulkEmails() may be out of order. As mentioned previously, this isn’t a problem for our particular use case.

Future

One hard part we haven’t yet solved is how to optimize the constraints to create a perfect balance between efficiency and resource consumption. We’ve seen how to easily set higher-limit constraints in RxJS but we have no guarantee that the observable pipeline we craft will maximize the data flow and come close to those limits. It’s likely we, as developers, will create bottlenecks and limit the performance. We’re essentially trying to solve a differential equation. (This shouldn’t scare you off using RxJS though. Even an RxJS approach with bottlenecks will likely be tons better than another approach.)

This is an area is where the code could self-diagnose and suggest constraint modifications or even automatically adjust itself on the fly. To expand, it would be amazing if we could run the algorithm on a subset of data and have it print out the average time taken for each operation along with suggested constraint values. It would be even more amazing if the algorithm could self-optimize, meaning it would adjust the constraints on its own in real-time. Somewhat similar to the TCP congestion control algorithms. This would be no small feat but we might be able to leverage tools like rxjs-spy to accomplish it.

Summary

We’ve seen some amazing improvements in our benchmarks. While some may think the Promise.all() approach is an optimal solution, our Observable approach surpassed it in efficiency by a factor of 3x+.

Keep in mind that RxJS doesn’t magically give us an optimal solution. Instead, it provides us a toolset and a reactive processing model for us to craft an optimal solution. Achieving the same level of efficiency may be possible with other libraries/techniques but those solutions would likely be far more complex.

Using RxJS for batch processing is certainly worth the investment if efficiency and control are of high importance for your particular batch processing job.

--

--