Challenges of Realtime Pipelines

Marcos
Strands Tech Corner
7 min readDec 12, 2019

In the recent years at Strands we have seen an increasing demand to ingest data and gather analytics near real time.

While there have been earlier projects with such a goal, this article will focus particularly on some of the challenges we faced in Strands when building a reliable standard product, that we could later customize for our client’s different needs.

In this article we aim to cover the choice of an asynchronous architecture, the impact of accessing the database in bulk, the use (or not) of an ORM framework, and the evaluation of different caching solutions.

Synchronous vs asynchronous

Synchronous architectures offer a number of benefits that at first were very appealing. For example the simplicity of knowing at the time of execution whether a call succeeded or failed, and in case of failure, being able to retry in the original context.

However, we opted for an Asynchronous architecture. It allowed us writing to the database in bulk, which boosted our overall throughput. Decoupling the fact of acknowledging the message from its actual processing made the solution more complex, but at the same time more scalable and reliable. Note messages are persisted in an internal store before being acknowledged to the Messaging System.

A 3000 feet overview of its asynchronous architecture can be seen below:

Realtime accepts messages from a Messaging System, and will process them in batch, either after reaching a minimum number of messages, or at least after some interval. This means it will apply the business logic, mainly updating some of the database tables, and then its output can be sent back to the Messaging System and/or forwarded to other components of the Strands Suite. After all these years we have worked with a number of different Messaging Systems: from the JMS spectrum (Active MQ, Ibm MQ,..), to RabbitMQ, and more recently Kafka and Google PubSub.

Internally, processing the messages involves distributing them to their respective flow according to its action and type. Each flow represents the business logic and may have steps to validate the input, insert rows, update rows, update the cache, dispatch new messages, etc

Understanding the domain

This phase is a must no matter the application, and so it is nothing new to a seasoned developer, but it is worth mentioning it as in the very beginning we were facing scenarios we were not familiar with. Though we already had some integrations using Web Services, Data ingestion was traditionally a responsibility that fell onto the Database team. As a Backend Developer, our role focused on fetching efficiently the information, applying some neat business logic and then handling the output to the Front End team in a well-defined format. At least that’s how I would explain my day to a neighbor in the lift.

Strands Suite encompasses many concepts that help grasp the domain. Some are used for presentation, some for business logic, some others spread both, etc. But fewer are really necessary when it comes to ingesting data.

Few concepts make it manageable to understand new scenarios and workflows, facilitating the development of our solution, while keeping performance in the radar.

So for example, if we think of a customer account, the most common flow would be updating its balance. This turns out to be a much smaller subset of what the whole FM suite supports, and helps optimizing this flow to reach the speed expected.

Batching in the Database

YMMV, but if there is a Commandment you are to keep in mind if you want your app to be performant: work in batches! Prepare your statements, open a transaction, do many things quickly, out of it.

Not fully convinced? Let’s get some metrics from our environments to understand the impact.

If I fetch one row of a table filtering by one index, I see an speed of 62 ms/row. If I do the same for 100 rows, I get 5 ms/row. Before trying with a higher set, do I need all the columns? I will limit the fields I fetch: 4,4 ms/row. That’s a 10% improvement. Now let’s give it a try with 1000 rows: 1,4 ms/row, or 44% faster than the original.

The same applies when it comes to Inserting or Updating, as our colleague Ferran Soler explained recently in a separate article.

And I will not delve into details, but if there’s a second Commandment, this is “Send several statements at once”.

Dropping backend dependencies: Database focus

Once we chose a bulk strategy, we had no option but to drop the use of existing Java code for fetching data, and build up a new batch-oriented one only for ingestion. We reached a point where we dropped all dependencies to the former code, and as a result the software cycle decoupled. Naturally, at this point, the Database model became our focus, and the Realtime product was then very sensitive to any development done by the Data team.

Hibernate vs JDBC

One of the first consequences of the last point is that our graph of dependencies between objects flattened. Not only there are less interactions, but there is less data to link one to another.

So for example a product may only need to know the Id of its customer, or a transaction may need to track its product and the customer. The life cycle Hibernate has plenty of flexibility when it comes to life cycle of the data, but we evaluated the possibilities and concluded that in our scenarios it was not even needed.

So we then decided to keep track and cache any necessary values ourselves, which is the topic of the next point.

Cache

After several attempts we decided to allow for different layouts of Caching. By using interfaces we allowed to abstract from an specific vendor while at the same time we followed a fail-over strategy for some of our more common scenarios. We did so to overcome the challenges of performance we faced. The final result is that a call to cache may behind the scenes imply a call to a first-level cache and, if necessary, to a second-level cache, and so on.

In practice we opted for layouts with two to three levels of caching. The last level was always the DB itself, in order to limit any Database call.

We considered a number of NoSQL alternatives for caching, a couple per category:

  • MongoDB: Distributed
  • Cassandra: Distributed
  • Hazelcast: Distributed , In-Memory
  • Redis: Distributed, In-Memory
  • RocksDB: Local store
  • LevelDB: Local store

If we want our pipeline to be easily scalable, a distributed cache seems an straight good fit. Hazelcast can be deployed very easily, and is an in-memory solution which should translate in a great speed (At a memory cost, no doubt).

On the other hand, most data that needs to be cached may not change very often, making a local store more attractive given it consumes less memory. We chose RocksDB because reads seemed to perform better, which was our main concern.

Thanks to the use of interfaces we can always implement the support for a new store, and the Database was used as a fail-over in case a record was not found. Let’s see the performance for the two of the above we chose:

  • Hazelcast, and the Database as failover:
  • RocksDB, and the Database as failover:

Our performance testing showed their overall throughput were at the same level.

We could even make use of the best of both worlds through a “fail-over” pipeline: a fast distributed in-memory to share new objects, and an on-disk cache for static objects to save memory. So we used first RocksDB with Hazelcast as failover, and then the other way around. In both configurations we still kept a third level, the Database:

  • RocksDB -> Hazelcast -> Database
  • Hazelcast-> RocksDB -> Database

The graphs show that the throughput of the 4 cache configurations were very similar, in the order of 1500 to 1700 messages per minute. The real throughput is obviously well above this number when run in a decent infrastructure: our main purpose here was to compare the different layouts.

Coming soon

In the next article we will look at performance testing, specifically different alternatives of deployment and their metrics,and a first assessment of scaling horizontally. Then we will see some points of improvement by making our messaging more efficient as well as the impact of parallel processing.

--

--