Java + Thinking > Frameworks

How we used plain old Java and a few clever design patterns to conquer a large scale data processing performance problem with great success.

The product I spend my days improving ensures business operations run smoothly and effectively during incidents such as IT failures, product recalls, natural disasters, dynamic staffing, service outages, medical emergencies and supply-chain disruption.

The Re-design

We process hundreds of thousands of events per day. Those events generate millions of potential notifications to the people who use our products. We filter those millions of potential notifications through our customer configured group membership, site membership, shift management, device management, escalation, and a host of other rules, to determine exactly who to notify. This complex process of determining the right person to contact at the right time is part of the magic of our product and key to the value we provide our customers.

A little over a year ago our engineering team determined we needed to optimize the engine that powered the processing of potential notifications. The existing engine was built on aging Java EE frameworks and relied heavily on a message bus (ActiveMQ), multi-threading and just-in-time database (RDBMS) access.

The optimized engine needed to reduce the load on our database systems. It needed to be faster, simpler to test and maintain, and enable a significant increase in total processing capacity without requiring more hardware.

Events by customer

Batched Stream Processing

We determined a form of batched stream processing fit the problem domain very well. Batched because initial event processing has a large known data input set (potential recipients), a beginning (the first potential recipient), and an end (the last potential recipient). Stream processing because we want to send the first notification to the first recipient as soon as possible (Time is critical in emergency notification) and we want to handle events of any size and complexity with the same modest resource consumption footprint.

After researching many off-the-shelf Java based solutions for batch and stream processing (Spring Batch, Camel, etc…) we determined they all added far too much overhead and complexity. It wasn’t that these tools didn’t do the job. They just included the complexity required to do all sorts of jobs we didn’t need to do. We needed a lightweight solution customized to our problem domain. It had to have streaming at it’s core rather than batch-iteration. So we got to work designing a system that would provide exactly the kind of throughput and performance characteristics we desired.

Discarded potential notifications by type

Abandoning Dogma

We started by abandoning what wasn’t working for us.

We abandoned multi-threading as a means to make processing of an event faster. A significant amount of resource was expended context switching between threads.

We abandoned message passing and asynchronous processing of potential notifications as messages. A significant amount of resource was expended moving messages on and off a message bus. The asynchronous nature of processing individual potential notifications in isolation made caching and testing exceedingly complex. The message broker infrastructure also required a lot of maintenance.

We abandoned iterative database access. The latency involved in communicating with the database hundreds of thousands of times over the network was adding up to a significant delay.

What we were left with, was a single thread processing all the potential notifications for an event, in one pass.

Sent notifications by customer

The Data Stream

In order to process the potential notifications efficiently we needed to retrieve them all from the database as rapidly as possible. To do this we wrote a query that produced a normalized row set with all the potential notifications and the associated group hierarchy in a JDBC cursor result set ordered as a depth-first tree traversal.

Why a JDBC cursor result set?

Network latency can add up. We want to make a single network call, move the data across the network as efficiently as possible, and avoid going back to the database. We pull all the data out of a single query via a JDBC cursor that we process row by row like a stream. This has the convenient effect of allowing us to send the first notification as soon as the first row arrives from the database and is processed. In practice the first notifications are being sent before we have finished retrieving all the rows from the database.

Why a depth-first tree traversal?

In our system potential recipients exist within groups, and those groups can be within groups, and so on. It’s a recursive hierarchy.

We return the rows in order such that we always receive the parent row before we receive the child row. In this way we receive and cache all the information pertaining to a potential notification before we process the potential notification.

The recursive hierarchy used to determine recipients

There are two common ways to traverse a tree (hierarchy). Breadth first and depth first.

Breadth first would have us reading all the top level objects (groups), then all their children, and so on. The problem with that approach is that if we try to cache information we will cache almost the entire tree, because until we reach the last leaf node in the tree we can’t prune any information out of the cache.

Depth first has us working our way down the tree to a leaf node as quickly as possible. From there we process sibling nodes until we have exhausted all the leaf nodes and move on to the parent’s sibling node. In this way we can rapidly prune out information as we exhaust leaf nodes in each section of the tree. This ability to throw away information allows us to cache all the information from parent objects in the thread context without the threat of storing the entire data tree in memory. We can hold on to all the information we need for exactly as long as we need it, then discard it.
This makes it possible for us to efficiently process events with hundreds of thousands of potential recipients while using remarkably little memory. Depth first was our winner.

Processing

We needed a lean processing system for the stream of data represented by the cursor result set rows.

We wrote our own set of interlocking filters in Java to achieve this. The principle is simple. At one end is the reader/writer. It’s kind of the engine of the whole process. It reads notifications out of the filter chain and writes them to the protocol engines that have the job of sending the notifications to recipients (people). 
The filter chain is a set of interlocking objects that have a read method that calls the read method of the next filter. Each filter is responsible for discarding, delaying or forwarding on potential notifications. If a notification works it’s way through all the filters then it is no longer a potential notification, it is now a notification winging it’s way to a recipient (person).
At the head of the filter chain is the source of all the potential notifications, the JDBC cursor result set. As the engine pulls notifications through the filter chain, rows are read from the JDBC cursor result set. JDBC does a great job of buffering the rows in over the network so the next row is ready to go when the filter calls.

The filter chain is a set of interlocking objects

Buffered Auditing and Batched Writes

As each potential notification is delayed, discarded or sent an audit record is created. These records are batched up and written to the audit data store in blocks of 1000. This reduces the number of writes by a factor of 1000 which reduces the impact of network latency and enables very efficient writes to permanent storage media.

A 1000x reduction in database requests

Testing

This single threaded batched stream processing approach is simple to test because the processing is sequential and deterministic (the same input always produces the same output). It’s hard to overstate how impactful this change was. We were able to build advanced test rigs to verify the system behaviour with varying inputs, and measure the performance characteristics under varying load.

Delayed notifications by reason

Scale

Because each execution is contained to single thread we were able to measure the average throughput of each thread and accurately tune the number of threads, CPU and RAM resources enabling us to process thousands of events simultaneously for hundreds of customers without breaking a sweat. Our new potential notification processing system is faster, uses less CPU, less memory, fewer network resources, and fewer database resources. It supports enormous recipient counts in events, and huge numbers of concurrent events. It is simple to measure, simple to monitor, and simple to reason about at scale and under load.

The result of this re-design was a resounding success.

Sent notifications by cluster version during a new version rollout

DIY Considerations

Any system that processes a stream of data is only as fast as the slowest component in the chain. It is very important when building a system such as this to measure and monitor the speed and resource consumption of each component of the system. Know where your bottlenecks are and optimize them. We put every filter under the microscope and constantly sought to optimize the weakest link. The expression “when you solve your number one problem, your number two problem gets a promotion” is never more true than when developing stream processing systems. A streaming design will punish the in-observant but will reward the diligent with outstanding performance and scale.