Challenges of Realtime Pipelines: Part II
In this article we will be looking at some other aspects of our Realtime Data Pipeline. We will start by evaluating different deployment configurations and then cover a brief summary of points to consider for the much needed performance testing. We will then analyze how we scale horizontally, giving some insight on messaging optimizations, and finally, review the influence of parallel processing.
Just FYI, in the former article we covered some points that had to do with the choice of an asynchronous architecture, database batching, the use (or not) of an ORM framework and, importantly, caching.
Deployment
Equally as important as which components are used is their proximity to each other. We can guess that the closer the Realtime process are to the cache instances the better. But let us imagine End User Data needs to be located in specific servers. What would be the toll to pay?
We have deployed our Realtime app and its cache, as well as some other Strands components and tools in our Kubernetes Cluster, in Oracle Cloud. From the different options we covered in the previous article, we will use only Hazelcast as cache, with Database as failover. We have targeted two different deployments. In the first one, the rte nodes are the same as the cache nodes (node affinity).
The second configuration is the opposite: cache nodes are not the same as the rte processing nodes, even though they still speak to each other (node anti-affinity):
We went ahead and sent 20k messages to Strands Realtime: think of messages that create a new operation or update a balance account, that you can easily review in the-name-of-your-bank-here app.
The first configuration took 220 seconds, while the second one 280 seconds: 27% faster. Again, the numbers may be different in another scenario or with a different hardware or network, but what we know is that if we do not have any restrictions, a different deployment can bring us from 37k messages per minute to 50k messages per minute.
Performance test
It would be unfair to omit the effort spent on performance testing. However, it is quite opposite: it deserves an article on its own to go through the use of activemq, a custom benchmark tool, jmeter, docker, elastic search, kibana, logstash, more recently prometheus, grafana,..
As an example, the overview of one of our performance environments below shows the scale of its complexity. In this particular environment we use Jmeter to randomly call our Benchmark tool, which then hammers ActiveMQ with messages that are processed by Realtime and end up reflected in our Database. Filebeat uploads to our ElasticSearch so we can visualize in Kibana its behaviour.
Preparing and running tests takes time and resources. And very important: do not forget the Test Data. It needs to be close to Production volumes and replicate similar scenarios. In summary, plan ahead and make room for them.
Horizontal Scaling
Let’s see now how increasing the number of pods of Realtime improves the throughput. In a different environment we have worked with batches of 50 messages. When using only 1 instance (a pod), the throughput was nearly 16k messages/minute, taking 250 seconds to process 40k messages.
If we increase to 3 replicas in order to have a layout like the one below, it takes an average of 67 seconds to process the same 40k messages, with speeds ranging from 11k messages/minute to 12k/minute:
Ideally we would have linear scalability, but we know in reality there is competition for resources from the different pods, both to reach the cache and the database. However, the overall throughput is 44k messages/minute, still 115% faster.
Optimizations in Messaging
It is important not to lose sight of the performance of messaging itself, which includes the level of caching — at the very least caching the connection — but as well the serialization cost.
Even the message format can be a factor to consider. When choosing Json, XML, Thrift, Avro, etc., performance is one of the key aspects, but so are others like cross-language support for example. At Strands, Hugo Jacquelin led an implementation where a -proprietary- “fixed length” format helped lowering the size of messages. Though it implied complexity for development and testing, if I/O from and to messaging infrastructure proves to be a bottleneck, then message size efficiency is a must.
Parallel processing
Strands Realtime makes use of parallel processing at different stages, like reading from the messaging system, actual processing or writing to the database. Even for the same stage, there is parallelization at different levels. While we are not going to discuss this process in detail, we do want to show the impact in the throughput. The first graph shows a performance test where actual processing does not do any parallel execution:
We hit max speed somewhere halfway the duration of the test, and from this point the speed does not improve, no matter whether more load is put into the system. If we now process in parallel with a factor of 10, and we increase the load so the duration of the test is meaningful:
We reach the peak much quicker, and this time we hit a peak of 33k messages/minute.
If on top of that we now use two nodes:
The throughput does increase again, this time up to 44k messages per minute.
Summary
We have covered the main difficulties we came across in order to reach our milestones.
Since the conception of Realtime, new scenarios have come up and the product as such has evolved to support them, becoming one component more of our Data Pipeline.
Composite messages, that include historical data, have made us reconsider how and where we calculate aggregates, and have substantially improved both the I/O at the message queues as well as at the Database.
We left out other points like the different alternatives for message acknowledge or message persistence.
In future articles we will cover the details of deploying in a Kubernetes cluster, as well as how to monitor by using Prometheus and Grafana. We will look at Streamsets and Kafka Connect, as well as other aspects of building our data pipelines.