Multi-clustered architectures to scale big data systems

juanjolainez
CreditorWatch
Published in
9 min readJul 11, 2021

In CreditorWatch, we use Hadoop clusters in combination with Spark and R to produce hundreds of billions of data points every month.

This is the actual story of the alternatives that were considered and how our architecture evolved until reach a final multi-clustered architecture that we happily use in production to the day.

Photo by Kelvin Ang on Unsplash

Scaling systems is one of the trickiest disciplines in software engineering and software architecture. It involves not only thinking about the code, but also about how to work with multi-node databases and choosing the database architecture, choosing the right file system replication system, working with load balancers, reverse proxies, and several servers among many others. There are copious amounts of literature about it (I’d strongly recommend Artur Ejsmont’s “Web Scalability for Startup Engineer”) since it’s been a must-know for most IT corporations for decades now.

The irruption of BigData, though, is much more recent, and there is not that much literature about how to scale such systems. Hadoop clusters that intend to process as much information as we do (hundreds of billions of data points) are usually quite expensive but, with the right tools, making elastic Big Data systems is now much more achievable.

First approach: Simple pipeline

Our first approach was simple. We followed a simple design in which a data pipeline with several stages runs using a Hadoop cluster.

Simple pipeline approach

In this case, we only have 1 data pipeline, so the implementation was quite straightforward. Our Hadoop cluster was scalable both horizontally (by adding more nodes) and vertically (make nodes bigger, adding more memory and/or CPU).

This approach is quite classic and it’s been documented multiple times. A very popular tool for that would be Airflow but, in our case, we use a “Pipeline as Code” approach using Jenkins with the Blue Ocean extension.

This approach allowed us to get our first experience with Big Data in the company and this alone was generating more than 150 billion data points every single month. The first pipeline wrapped the data transformation (the T in the ETL process) to achieve an outcome for a specific project and, having succeeded in achieving this, more projects that needed Big Data capabilities rapidly entered our pipeline.

At that moment, and with a successful proof of concept, we needed a way to scale these processes having in mind that there would be multiple projects down the line using big data capabilities.

A very strong design principle was that every project’s code should NOT be coupled in any way with another project so, at this point, we decided that every project would have its own pipeline. This is important because each pipeline might be developed and maintained by different teams, and we didn’t want to create code coupling in any way between different projects.

Second approach: Chained pipelines

The first approach we thought of was to trigger the next pipeline once a pipeline finishes, essentially implementing the chain of responsibility pattern (https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern) on a pipeline level. In this case, the last node of each pipeline would be to trigger the next pipeline.

Chained pipelines approach

This approach allowed us to run the whole system (several pipelines) in a single Hadoop cluster, which keeps the cost constant (we still have 1 cluster, which is the expensive part of the system). The obvious bad outcome of this approach is that, if one node of one pipeline fails, there will be several projects that, even though they are correctly coded and configured, will never run until the failing node is retried or fixed.

This creates undesired coupling between projects (project C won’t run if project A fails) that has been created for a resource issue. Another adverse consequence is that, even though pipeline N might take 1 hour to complete (Tn), its real completion time since scheduled is T0 + T1 + … + Tn-1 + Tn, being T the time of completion of each pipeline.

The last flaw in this design is that the pipelines are coupled together. Pipeline 1 needs to know about the existence of pipeline 2 and how to trigger it. If the pipelines were in different networks and in different technologies, it’d mean that not only the code (last node) would be coupled with the next step, but also the underlying infrastructure (we might need to configure firewalls, routing rules, DNS records, VPNs, ….) so one pipeline is able to trigger the following one.

Third approach: Concurrent pipelines

The next logical step would be trying a concurrent approach, that would see us triggering all the pipelines concurrently. It would look like this:

Concurrent pipelines approach

In this approach, we remove some of the undesired effects, so it looks like a bit better solution.

With this architecture, no pipeline needs to know about any other pipeline anywhere in the system, which removes the code coupling and the possible infrastructure headache.

We also remove the chained failure (if a node in pipeline 1 fails, pipeline N won’t be executed) since executions are co-dependant in any way.

And, at first look, we also remove the time constraint (the execution of pipeline N will take T0 + T1 + … + Tn-1 + Tn). This, unfortunately, it’s not true.

The resources on the Hadoop clusters are limited, and tasks scheduled will get as many resources as possible. This will bring the cluster to try to accommodate as many tasks as possible by giving fewer resources to every task. This will increment the execution time of all the tasks running in the cluster, taking the same time (probably even more) as if we were running the processes in a chain.

Since the tasks will get as many resources as they can, many tasks in the pipeline will inevitably fail due to a lack of resources (because they are being used by other processes running at that time). Tasks can (and should!) implement retries on failure, but handling different types of failures and retries will inevitably increase the complexity of the pipelines.

Fourth approach: Queued pipelines

The next approach that we considered was aimed to get all the good things from the previous approach but trying to remove the just increased complexity of the pipelines. In this case, the next iteration would be to implement a process queue right in front of the Hadoop cluster.

Queued pipelines approach

This approach has all the benefits from the previous one but removes having to increment the complexity of the pipeline by handling different types of failures and to have several processes competing for resources that will inevitably make each node run slower than if it ran by itself with the totality of the resources.

I have seen this approach implemented on “on-demand” clusters on big companies where, as a user (being a data scientist or a data engineer) you can submit a query to the Hadoop cluster (in that company we were using Hive), it would be queued and it would be executed after the queries before you were satisfied.

This approach seems to work just fine and solve most of the issues we had except one: the execution time for one pipeline is still affected by the other pipelines. Since we have a few pipelines (3 by the time I’m writing this post) and we expect to have many more, this approach doesn’t scale in the terms that we’d like (even though the cluster itself can scale both vertically and horizontally as explained at the beginning of the article).

Final approach: Multi-clustered architecture

The final approach, which is the one that we currently have in production is a bit more complex, but it’s able to satisfy all the cons that we found along the way, but producing some other much smaller ones.

In this case, we would use 1 cluster per pipeline BUT that cluster would spin up at the beginning of the pipeline and spin down at the end of it.

The architecture would look like this:

Multi-clustered approach

The final pipeline would change to

Modified pipeline

In this case, we have all the benefits of the previous approaches (decoupling of code and execution, simplicity of the pipeline nodes, … ) and we don’t have the execution time issue we had previously.

Now, each pipeline will take the same time regardless of how many pipelines are in the system running at the same time. Also, now we are able to scale up and down every single pipeline if desired (some pipelines produce more data points than others) and to change their topology if needed to satisfy different use cases.

On the other hand, now each pipeline is responsible to handle their own resources, so they need to be able how to spin up and down their own Hadoop cluster and we need to make sure that the machine where the pipeline runs has the permissions to do so.

In our case, that was a solved problem, since we have adopted Infrastructure as Code for a long time. More specifically, we use a Cloudformation template (that each pipeline configures with its desired topology) to spin up several EMR clusters with an edge Rstudio instance.

With this architecture, we are able to produce and process as much data as we need and the processing time, instead of being the sum of all the processes, it’s just the slowest of all the processes max(T0, T1, …, Tn-1, Tn).

In terms of cost (and to be fair, assuming that we spin up and down the cluster at the beginning and the end in the other architectures) is the exact same.

Since we pay for usage (time the cluster is up), we find that the cost is the exact same, which is the sum of T0 + T1 + .. + Tn-1 + Tn * (cost per unit of time) in every approach (assuming processes don’t fail). The only difference is that in the last approach the cost per unit of time is higher and the time is shorter and in the others, we have longer execution time and lower cost per unit of time.

Conclusion

The multi-clustered architecture allowed us to more than double the data processed initially taking the same amount of time as we did before (our heaviest process happened to be the initial one) and seems to satisfy all our scalability requirements at this point.

Its only significant drawback is having to implement the spin-up and tear-down of the cluster in each pipeline. Other approaches (such as the queued pipeline approach) have other legit use cases but in our case, its drawbacks didn’t allow us to scale properly.

In terms of cost, all approaches have the exact same one since we have an on-demand billing configuration so it doesn’t play a significant role in the way we evaluate our architecture options.

Remember to follow me and/or CreditorWatch on medium for more cool content and feel free to clap the story if you like it; therefore, it will help somebody else in the future! Thank you so much for your attention and participation.

If you have any questions, please leave a comment or ask me on my LinkedIn (https://www.linkedin.com/in/juanjo-lainez-reche/) and I’ll get back to you asap.

--

--

juanjolainez
CreditorWatch

Software Architect, tech enthusiast, probably average writer