Scaling a Mature Data Pipeline — Managing Overhead
There is often a hidden performance cost tied to the complexity of data pipelines — the overhead. In this post, we will introduce its concept, and examine the techniques we use to avoid it in our data pipelines.
Author: Zachary Ennenga
There is often a natural evolution in the tooling, organization, and technical underpinning of data pipelines. Most data teams and data pipelines are born from a monolithic collection of queries. As the pipeline grows in its complexity, it becomes sensible to leverage the Java or Python Spark libraries, and implement your map reduce logic in code, rather than in raw queries. The monolith is broken down, and you trade complexity in orchestration for simplicity in logic. Your one monolithic job becomes a dozen beautiful, tightly scoped steps structured into some sort of dependency graph.
However, orchestration complexity has a cost: overhead. Broadly, overhead is everything your pipeline is doing other than computing: performing IO, waiting for resources to be allocated to your job, waiting for your job to get scheduled, and so on. Overhead is insidious: It grows slowly, with your pipeline, and generally only becomes problematic when you have dozens of tasks to manage. At that point, there are many variables affecting pipeline performance, and observing and isolating sources of overhead can be extremely difficult, especially when your pipeline legitimately spends a large amount of time computing.
The Payments team operates a number of time-sensitive Spark pipelines, and central to our team’s goals is the timely delivery data from these pipelines. However, as Airbnb has matured and grown, our pipeline has had to continually rise to meet the challenges presented by the scale and scope of our operations.
There was a time when running our pipeline once a day was sufficient to both meet our delivery SLA, and manage the risk of data integrity issues occurring over the following 24 hours. However, increasingly, processing data daily was not meeting our needs. So, we began to investigate the technical feasibility running hourly data pipelines.
And so we became aware of our overhead problem: Due to a number of choices we had made regarding how we structured our business and orchestration logic, we discovered that we were totally blocked from implementing hourly pipelines until we got our overhead under control.
Before delving into our specifics, I want to take a moment to discuss the technical stack backing our pipeline.
Our platform uses a mixture of Spark and Hive jobs. Our core pipeline is primarily implemented in Scala. However, we leverage Spark SQL in certain contexts.
We leverage YARN for job scheduling and resource management, and execute our jobs on Amazon EMR.
We use Airflow as our task orchestration system that takes care of the orchestration logic. For a data pipeline, we define the orchestration logic as the logic that facilitates the execution of your tasks. It includes the logic that you use to define your dependency graph, your configuration system, your Spark job runner, and so on. In other words, anything required to run your pipeline that is not a map-reduce job or other business logic tends to be orchestration logic. In total, the our pipelines are made up of a little over a thousand tasks.
A Case Study: The Integration Test Pipeline
Our journey of discovery began in an unlikely place, in one of the integration test pipelines. This is a special set of pipelines managed by the Payments team, which take events emitted from unit tests and run them through our entire pipeline to detect issues in the Payments team’s code before it even gets merged to the master branch.
In terms of structure, scope, and configuration, the integration test pipeline is identical to our production pipeline. The only difference is the integration tests handle data volume on the order of a few hundred records per run, which is an incredibly small amount of data: even an hourly pipeline in production will process several orders of magnitude more data than this. That said, the performance of an hourly pipeline is likely closer to the integration test pipeline than the current daily pipeline.
The core segment of our pipeline, running at full scale, is expected to take 6 hours to complete each day. Assuming the same resource allocation, an hourly pipeline, in theory, should take about 1/24th of 6 hours, or 15 minutes. The integration test pipeline, in contrast, should take close to no time at all, due to the tiny data load.
However, the execution time is roughly 2 hours. Adjusting the Spark configuration to account for the smaller data load has nearly no effect. When we investigated further, we found that the time spent doing ETL operations, or any sort of Spark compute was close to 0. Clearly, the pipeline was spending its time doing something else.
This was undesirable for our team. Because accounting logic is inherently stateful, for a new pipeline run to start, it requires the previous run to complete. Obviously, we were blocked from even starting our experiments with an hourly pipeline, unless we could get our execution time under an hour.
The Overhead: A Silent Killer
We expected that our pipeline had some overhead, but we did not know how much. When we test our jobs, we generally don’t use the same orchestration tools as we use in our full production pipeline — we use unit tests, or run jobs individually. In addition, we run tests with different resource allocations, and on a different map-reduce cluster than we do for our production pipelines. All of this served to obfuscate the impact the overhead was causing in our pipelines.
So we took a step back, and analyzed our pipeline holistically. While exact sources of the overhead will vary wildly based on the pipeline structure and stack, we identified some common sources:
Scheduler delay: Be it a crontab, Airflow, a task queue, or something else, every complex data pipeline has some system that manages dependencies and job scheduling. Generally, there is some delay between a job completing and the next job starting.
Pre-execution delay: Great, the job has been scheduled — now what? In many cases, before the job is started, there is some pre-work to perform. For example, we have to push our JAR onto the machine that is executing our task, and perform some sanity checks to guarantee that data from previous tasks has landed. Lastly, there is the initialization of the application code, loading any configuration or system library dependencies, and so on.
Spark Session instantiation and resource allocation: Spark Sessions take time to setup, so the more often you start and stop your session, the longer everything will take. In addition, your job has to acquire all necessary resources from your cluster. Spark features such as dynamic allocation can make this faster, but there will always be some ramp-up time. This can also impact your pipeline at the cluster level. When using an autoscaling cluster like Amazon EMR, while you may theoretically have access to a large pool of resources, it may take significant time to assign them to your particular cluster via a scaling operation.
Saving and loading data: In a complex pipeline, you often have intermediate results you need to persist for other pipelines or jobs to consume. It’s often a good idea to break your pipeline into discrete steps, to both manage logical complexity, and limit the impact of job failures on your pipeline’s run time. However, if you make your steps too granular, you end up spending a lot of unnecessary time serializing and deserializing records to HDFS.
However, all of these impacts were small, on the order of minutes — they did not explain a 2-hour delay.
Sizing Up Your Pipeline
What we realized was that the impact of the overhead depends on the size and shape of the pipeline. The relationships between dependent tasks in data pipelines tend to be structured as a Directed Acyclic Graph, or DAG. A helpful exercise here is to physically draw the structure of your pipeline, if you don’t have a tool with a UI (like Airflow) that does it for you. Make each distinct task a node in your DAG, and you will come up with an image like the following one.
The shape and size of a DAG can be measured in two factors: the depth and the width.
The depth is the measure of how many links there are between any given task, and its closest edge node. The width is the number of nodes with some given depth.
The overhead tends to scale with the depth of your graph, particularly in “linear” segments of the graph, where tasks execute in series. However, wide graphs are not immune to the overhead — you must be cautious about the time spent when saving and loading records from HDFS. IO has fixed overhead, so while the landing time of your tasks may not suffer, the total compute time will, and this can lead to increased compute costs. (Managing cost is outside the scope of this post, so I’ll leave things there, but try to avoid trading a time problem for a cost problem.)
So, with this framework, we realized the structure of our pipeline was the root cause of our overhead problem. So what did we decide to do?
Simple — scale it down!
Phenomenal Data Processing, Itty Bitty Pipeline
While ideally, you can simply remove or reduce sources of the overhead directly, often, this is infeasible or would cost an unreasonable amount of development time. So, the only remaining solution is to shrink the size of your pipeline.
This does not mean to cut the scope of your pipeline, or, really, change it at all. It really comes down to the difference between business logic, and orchestration logic.
Often, data pipelines reflect the structure of the underlying application. It is really easy to end up in a situation, where the common way to submit jobs to a cluster is via the spark-submit script provided by Spark. The script asks you to provide a class, so you tend to make a class that maps to a single task in your pipeline. Classes are generally designed by traditional software engineering principles, and thus, perform a single tightly scoped transformation operation.
This is problematic for a bunch of reasons — often orchestration logic lives separately from the business logic, so you get into a situation where making a change to your pipeline requires you to synchronize the deploy of changes to multiple systems, which is never a good place to be. And of course, doing this can cause massive overhead if you don’t consider the orchestration implications of your logical abstractions.
We came to the understanding that our business logic should be designed in a maintainable and scalable way, and our orchestration logic should be designed to maximize performance, visibility, and reliability of our pipeline.
While this is a fairly obvious statement, what was less clear is that when business and orchestration constructs, i.e., the actual classes that define your business and orchestration logic, are coupled together, often the aims of the systems are in opposition to each other. As we have discussed, tightly scoped logical abstractions, when mapped directly to a DAG, lead to a pipeline that is wide or deep enough to cause significant overhead.
What we decided to do was build a layer in our application between our business logic and orchestration logic that would allow us to meet the needs of each system.
Consider the following data pipeline:
Traditionally, your application code would look something like:
In this setup, each class maps to a single logical transformative step.
However, we have architected a Single Entry Point that defines a contract between the orchestration system and our business logic that has no inherent correlation to our application logic.
The basic usage of it looks something like:
This example, functionally, is pretty similar to the initial setup — we are still mapping tasks in our pipeline to classes in our code on a 1:1 basis.
However, this gives us a few benefits. For example, we can restructure our underlying classes however we want without making any changes to the orchestration system. We could do something like:
The critical goal to achieve here is to completely decouple the orchestration of your pipeline from the structure of your business logic, so that your orchestration-related components can be architected to minimize overhead, and your business logic can be structured to minimize its complexity. Put another way, you want to architect your application so the structure of your DAG is totally independent from the structure of your business logic.
Leveraging the system above, we could easily restructure our pipeline into:
In doing so, we have significantly reduced the overhead, without any changes to the structure of our business logic, or the delivery order of our tasks.
An Aside on Fault Tolerance
A final consideration here is fault tolerance. Broadly, the longer a job runs in a distributed computing environment, the more likely it will fail.
Generally, data jobs are easy to retry in an idempotent manner, so the cost of a job that fails is generally just the time spent up to the failure, plus the time spent to retry. This is something to consider when trying to fight the overhead — theoretically, combining all your jobs into one huge task would get rid of all the overhead, but massively increase the risk of failure, which could eat up all the time savings you get by removing sources of overhead.
Overall, this is a balance. In our case, for large segments of our pipeline, the cost and risk of retries was much, much lower than the overhead incurred by splitting the tasks up, but this won’t always be true. A good rule of thumb is if your overhead is equal to or greater than 10% of your job’s execution time, merging them is likely a safe bet.
So what did we learn?
- The natural evolution of data pipelines, from monolithic collections of scripts to Spark applications, naturally pushes you to encode your application structure in your pipeline.
- The overhead is everything your pipeline is doing other than computation. It’s caused by orchestration complexity, and scales with the depth of your pipeline.
- Encoding your application structure in your pipeline means you intrinsically couple your application logic to your orchestration logic. This means you are often inviting overhead by making your map-reduce tasks too granular.
- By decoupling your orchestration logic from your application logic you gain tools to fight the overhead, without compromising the quality of your application.
- When attempting to reduce the run time of a data pipeline, be careful not to miss the forest for the trees. Analyze your whole pipeline’s execution time, not just the obvious factors, like map-reduce computation time.
- You can’t neglect fault tolerance considerations. Make sure you don’t lose all the time you saved lowering overhead by spending it retrying tasks that fail frequently.
While our execution of our overhead reduction measures are still ongoing, early tests show us that we will be able to decrease our overhead from 2 hours to 15–30 minutes. Not only will this improve the delivery time of our pipeline, this will allow us to pursue an hourly pipeline in the future.