Don’t orchestrate when you can compose.

A cautionary tale about a data pipeline

Jeff Smith
Data Engineering
11 min readNov 24, 2015

--

Da Da Da Taaaaa!

Imagine you walk into a new job and find the following data processing system.

Job GetTheData needs to extract all of the necessary data from all of the various application databases. Then job CrunchTheNumbers needs to run to aggregate all of the data from the previous step. It just dumps its output to S3, so there’s still another job called LoadTheData that involves picking up the data and putting it into the relational database that the dashboard application uses. Then, there’s this other job called TotalUpTheBill that picks up that data from S3 and figures out which customers need to be billed and for how much. The TotalUpTheBill job saves its results in a temporary flat file on the node that runs the job. Another job called SendTheBills takes that file and sends off a bunch of emails to bill all of the customers using some data from another relational database that holds all of the customers account information.

This workflow is a nightmare to keep running. Anytime anyone makes a change in any intermediate representation of the data, bugs pop out all over. Add a field here, remove a field there-who knows what will happen?

Jay Kreps and others in the Kafka community have likened systems with this dataflow to spaghetti.

Mmmm…derived representations of raw data.

Using a distributed commit log like Kafka certainly helps with this problem but sort of via indirection. Most of the problems in a system like this are going to come out of a complete failure to manage schemas and their evolution. So most of the benefits of using a system like Kafka are going to come from actually explicitly managing schemas and their evolution. This is similar to the scenario that occurred in the early days of continuous integration where the biggest benefit of adopting CI for most organizations was actually just using source control at all.

There’s another variant of this problem, though, and it doesn’t require a global commit log or externalized schemas to solve. This version of the problem is comes from the power duo of big data and data science. Thanks to advances in data engineering there are now massive amounts of data for data scientists to do what they do best: conceive of massive Rube Goldbergian contraptions that perform black magic with data.

So data scientists and engineers have collaborated to build systems that extract features from raw data, then normalize those features, then select amongst those features, then learn a model from those features, then use that model to make predictions for another model, then use that model in an ensemble of other models, that will be applied to a validation set that will allow us to set model hyperparameters, then apply that model and parameters to a test set that will tell us if the model can be trusted before deploying it to production to predict whether or not a user will click on an ad.

Spoiler alert: they’re not going to click on the ad. ¯\_(ツ)_/¯

To people working on simpler applications than machine learning systems, the example above probably sounds like an exaggeration. I assure you it is not.

Studies have shown that a full 20% of a working data scientist’s time is spent on conceiving of data systems that are nearly impossible to construct.*

* Studies may in fact show no such thing. Who has the time to read studies?

This labyrinthine control flow is the norm in machine learning. One could even argue that it’s a necessary aspect of building useful machine learning technology.

Machine learning is the perfect example of “more is different.” Incredible advances in our capability to collect, persist, and transform data were necessary before modern machine learning could achieve even a small fraction of what’s been done in the past decade. Data engineering has made it possible for us to build incredibly powerful systems that predict the future. But most systems I’ve seen fall short of this ideal.

One of the reasons for this disconnect is the Rube Goldberg machine discussed above. At first glance, it sounds like the spaghetti problem again, but it’s not. A key feature of the systems that exhibit the spaghetti problem is that they are heterogeneous. The different systems are separate, because they serve distinct purposes and thus have entirely separate implementations. Their code is likely kept in separate repos and may even be written in different languages.

The machine learning Rube Goldberg machine is actually just one system, though. It may have incredibly complex semantics. We are, after all, talking about building a machine that will predict the future. But the feature generation functionality is intimately connected to the model learning functionality which is intimately connected to the ensembling functionality.

It’s all one machine learning system.

Whether or not the code for all of these components is kept in the same repo or written in the same language, it all needs to honor the exact same semantics. A feature, a model, and a class label all have to mean exactly the same thing in each component of this system.

As Uri Laserson has chronicled, bioinformatics as a field has a major version of this problem. Incompatible file formats are used for each phase in the analysis, all of them encoding data in different ways. No knowledge is directly shared in code across pipeline phases, even though obviously all aspects of analyzing genomic data are deeply intertwined. I applaud his efforts to modernize the bioinformatics workflow and get scientists to use tools like Spark and Parquet to reduce some of their data handling pain.

Most machine learning systems aren’t implemented by biologists, but many real world machine learning systems still have this problem. Far too often, a machine learning system won’t be implemented as a single system. Instead, it will be composed of a bunch of smaller systems using the database or even flat files to communicate between separate jobs that only have implicit knowledge of each other.

People will often duct tape these pieces together using a tool like Luigi or worse, BASH and cron. This usually results in something that runs-most of the time, and does what it’s designed to do-if you don’t screw up. But the connection points between all of the little subsystems seem to always end up causing a disproportionate amount of problems.

Schema will evolve. Some developer of some system upstream of the machine learning system will decide to change the shape of the data consumed by the machine learning system. At best, this will result in the system blowing up, firing alerts, and making everyone scramble. Of course, things don’t always go your way. An equally likely scenario is that your machine learning systems starts to fail in some largely silent and hard to diagnose way. Failing at machine learning is easy; there’s lots of ways to do it.

This isn’t really the fault of tools like Luigi. These sorts of pipeline management tools are effectively orchestrators. They view pipelines in very similar terms to how pipelined operations were conceived of in Unix: the user could send any thing to any other thing. If that doesn’t work out, then something will blow up.

Fun fact: New York data pipelines are populated by 20% pizza rats, 30% alligators, and 50% fintech startups.

Luigi and similar tools go even further than sending one thing to another thing. Often, they take on responsibilities like dependency resolution, and this is where things get out of hand.

When Luigi says that ConsumerJob depends on ProducerJob to produce the data necessary for ConsumerJob to run, that’s a dependency. But Luigi, like all other job orchestration tools doesn’t really know if ConsumerJob can actually ingest the data produced by ProducerJob before those jobs are executed. How could it? It has no knowledge of the actual objects being produced by either job beyond what exists in the code that orchestrates the job interactions. Since that code is dynamically-typed Python code, you would need to actually run some of that code to see if the two jobs are still compatible (potentially in a test).

You might be reaching for the same solution used in the spaghetti problem at this point. Don’t do it! This problem is not that hard, so we can use far simpler tools to solve it. There’s no need for a global commit log like Kafka or an externalized schema and registry using Avro.

You can just compose your pipeline in application code.

It sounds so simple; it can’t actually be a solution to a big data/machine learning problem can it? There should at least be some blog post trending on Hacker News that justifies this approach using a greek letter or two, right? Well, here’s a Ξ to get you started. I’ll have to rely on you for the upvotes. On to the engineering.

If you find yourself in this situation and you’re using a statically-typed language like Scala or Java, you can rely on the type system to verify composition for you.

The code above will never compile unless those functions do in fact compose. Instead of just hoping that the output of the production function is consumable by the consumption function, the compiler will tell you. Right now. On your laptop. Before you spend a cent on EC2 costs. ƪ(˘⌣˘)ʃ

That is incredibly useful stuff, especially if you, like me, are at a company that would prefer to spend its money on beanbags and beer instead of on large batch jobs that didn’t finish successfully.

I feel so strongly about this style of pipeline construction, that I worked with a colleague on the implementation a library around exactly these ideas. It’s called Mario, and it came out of our dissatisfaction with job orchestration solutions when they were used for pipeline composition.

In Mario, each phase of a pipeline is conceived of as an immutable pipeline object, encapsulating a function. Pipeline steps can never get out of sync without that being directly visible in an IDE as type mismatch. The validation of the pipeline’s composition is done by the all-powerful Scala type system, which gives it stronger guarantees that mere schema-level compatibility. Pipelines are composed in this way are fully typesafe, with all of the reassurances typesafety brings.

Mario and other pipelining technologies are concerned with a lot more than just typesafety. They’re concerned with helping you elegantly model the flow of data through your program in ways that help you understand your pipeline and have it execute exactly as you would expect. As you start to dig into pipeline composition technologies, you’ll come across some very useful abstractions that should help you tame the complexity of your pipeline (even if many of them abuse plumbing terminology in ways that would make both Mario brothers cringe).

To be clear: Mario is only one of several tools you could use to compose your pipelines. I have no personal interest in getting you to use it. It’s very likely that some other tool suits your needs better, or even no tool at all beyond the language itself. Every workload is different.

So, let me introduce you to some options. I’ve been working on Hadoop and Spark for a while now, and so I’ve been primarily focused on how to make Hadoop and Spark pipelines work the way that I want them to.

If you’re working on Hadoop, I recommend checking out Cascading and Scalding, depending on if you’re working in Java or Scala, respectively. Both have some well-conceived abstractions for building pipelines.

If you’re working on Spark pipelines, you have a few choices, depending on what you decide your needs are:

  • For typesafe operations on datasets, you can just use RDDs, since they preserve the types of the objects in the RDD.
  • If you want a more elegant pipeline idiom and are willing to sacrifice some typesafety, you can use Spark’s Pipelines. They use DataFrames instead of RDDs, so they offer a weaker guarantee than composition using RDDs.
  • For the best of both worlds, take a look at KeystoneML. By some of the same folks who worked on the pipeline functionality in spark.ml, it takes these efforts to their logical conclusion, marrying pipeline composition with strong typing guarantees.

If you find yourself outside of a Hadoop or Spark workload, you may need to go with something less elegant. In particular, if you’re using third-party libraries or command line tools that you have no control over, I would recommend implementing typesafe facades that wrap your interaction with those third-party tools. Since these are third-party tools, you are not exposed to the same risks of schema evolution. You can choose to lock to a particular version and not be exposed any schema evolution in future versions until you want to. If you’re going to use any third-party library as a core part of your system, such as the model learning algorithm in a machine learning system, I would argue that you need to implement facades just to have a reasonable interface to develop and test against. On the JVM, JNI can be used to mediate these interactions with non-JVM components.

Notice that I didn’t suggest that you string together a bunch of batch jobs with no shared knowledge and just hope for the best. That would make me so sad. (ಥ_ಥ)

Orchestration has it’s place. In even the tiniest of startups, you will have multiple jobs that have some sort of relationship to each other. When you find yourself in that situation, then sure, you should use a job orchestration tool like Luigi. Erik Bernhardsson, creator of Luigi, has a nice guide to alternatives to Luigi to get you started (even if he does miss the difference between Mario and Luigi). There’s lots more than just the ones in that blog post. Pinterest just open sourced another one. I’m sure it’s fine. These things usually are.

Just don’t mistake all job construction problems for orchestration problems. Don’t reach for a sawzall when you really need a pry bar. Pipelines composed in typsesafe application code are easier to work with and will save you time.

The biggest gain you will find in preferring pipeline composition in code is simplicity. A job orchestration tool is just one more thing, and that means that it needs to be deployed and monitored. Someone might have to upgrade it some day. All of this takes time and effort.

When you choose to compose your pipelines in application code, you can lean on the power of the language’s type system and often little else. If you’re running distributed jobs, you might need to rely on libraries like Spark, but you should choose which libraries to use, in part, by how quickly they can tell you that you made a mistake. The most powerful pipeline composition tools can reason about the pipeline that you’ve composed and give you strong guarantees. So you can keep building big data workloads even when the coffee shop’s wifi is goes down for the afternoon.

Don’t orchestrate when you can compose.

“Someone should really write a program to do all of this tedious composition for me.”

--

--

Jeff Smith
Data Engineering

Author of Machine Learning Systems @ManningBooks. Building AIs for fun and profit. Friend of animals.