An Infinite Fleet of Plumbers

Brandon Willett
ActionIQ Tech Blog
Published in
8 min readMar 7, 2019
(an artist’s rendition of Luigi running on a cluster of EC2 instances)

Here at ActionIQ, we have to ingest a lot of data — thousands of terabytes, across dozens of data sources, for many clients, in several formats, multiple times per day. Dealing with enterprise-scale data processing could get overwhelming if we didn’t have a rock-solid, scalable solution! In order to manage all of these pipelines into our system and make sure each of them goes off without a hitch, we use a Python library called Luigi, originally developed at Spotify. Luigi helps us handle dependency management and uses a master-worker system of task orchestration to make sure the right jobs get done at the right time. And while at the beginning, a few workers living on the same machine as the master was more than good enough, there's only so much "scaling up" of that one machine you can do -- so, this article is about how we taught Luigi to dynamically scale itself out instead.

Luigi can be best thought of as a DAG, or a directed acyclic graph, of tasks that need to be done. These tasks are just simple Python functions (“submit a request to endpoint X to kick off process Y”, or “write the parquet file at location Z to the specified Amazon S3 bucket”), but the bread and butter of Luigi is its dependency system, which is where the directed graph analogy comes from. Each task can define a requires function, which in our system can look like this:

In the simplified snippet above, we can see that our FlightPlanExportTask requires a few other tasks to have finished before it can run -- namely, that the customer's data is already in the platform (represented byIngestJobDone) and has been processed correctly. Similarly, a MaterializeCollectionTask will have other tasks in its own requires function, and so at runtime, the Luigi master is able to analyze each task in its queue, determine which ones have each of its dependencies satisfied (or no dependencies), and start a worker on those tasks as it maintains internally a linearization of this tree of task requirements.

This sort of simple to express, but powerful logic allows us to be confident not only that every task that we need to run will run, but also that the ordering of those tasks will never be incorrect. This, combined with Luigi’s ability to intelligently choose whether to retry failed tasks, is crucial in presenting a robust and fault-tolerant service to the customers that use our platform every day.

As mentioned above, Luigi uses a coordinator/worker system of distributing work. This means that there’s one process, the Luigi master, that does all of the task orchestration and DAG-building, but it never actually executes any of the tasks itself. That, it leaves to the Luigi worker processes, which are simple Python threads that are continuously either working on a task given to them by the master process, or pinging that master looking for more work to do.

You might see where this is going. When we had only a few customers, this issue wasn’t even on our radar, but as we’ve grown, the resource usage of that Luigi machine is getting a little… tight. More customers means more worker processes, which means more thread usage and more RAM usage. While we could try to stave off the problem by continually increasing the size of the EC2 instance that Luigi runs on, we decided to be more proactive and instead build an EC2 cluster of remote worker instances that could connect to (and thus, take work from) a centralized master as needed. This way, whenever we run low on Luigi workers, we can just increase the size of the cluster and redeploy — easy, right?

Image taken from AWS Blog

But, this solution isn’t as nice as it could be: for one thing, it would require a redeploy each time we needed to upsize or downsize the cluster of worker machines (while this is possible through the AWS Console, it would mean our production infrastructure would be out of sync with our IaC, which is a dangerous situation to be in). But, more importantly, it means the cluster wouldn’t be able to respond to the demands of Luigi. We’d need to constantly run with a large overhead of workers in order to effectively handle a spike in usage without buckling or long queue times, which is costly and wasteful.

We needed autoscaling.

Amazon’s EC2 service, in fact, already has support for an auto-scaling cluster of instances. These clusters size up or down when certain Cloudwatch metrics meet certain thresholds for certain periods of time (all configurable), so a large part of the work was done for us — all we needed to do was set up the Cloudwatch alarms and make sure we’re posting the metrics. And, here we encounter the first question: how do we set up the “scale up” and “scale down” alarms? It seems straightforward (“scale up when there’s a lot of work we need to do”) but there’s more nuance than you might expect.

The main part here is, what constitutes “a lot” of work? Is it when there’s 1 pending task that’s in the Luigi work queue but not being executed? 10? 50? Or maybe putting a specific number on it is futile, since an average number of tasks today might be vastly different from an average number of tasks by this time next year?

In the end, this question is actually moot, because in Luigi, a task with status PENDING could mean either of:

  • This task cannot be worked on because it has unmet dependencies
  • This task can be worked on but isn’t because all of the workers are busy

Clearly, we’d want to scale up in the second case, but if all PENDING tasks are of the first kind, scaling up would be a waste! The distinction between these two types of PENDING tasks is made in Luigi, but only internally, and the information isn't exposed and available to users of Luigi, so we wouldn't be able to scale up or down on this information without forking Luigi and writing our own Scheduler RPC method to make the information accessible.

The scaling policies in AWS that are used to drive autoscaling are very flexible, almost to a fault! It’s easy to get lost in the configuration. As I was writing the rules, I made up this little note to help me digest the wealth of knobs:

If the $statistic of the metric is $comparison $threshold,
over a series of $num_evaluation_periods, each $period_secs long,
then change the desired size of the ASG by $adjustment.

In the end, here’s a look at what we ended up with:

It’s something pretty simple, because it has to be. If every single worker in our pool is working on a task and none are idle, then we need a bigger pool. It’s not a perfect metric; for example, if we have 20 workers and 20 tasks that need doing, then we’ll end up with a few extra workers who won’t get any tasks. But given the limited information available to us through Luigi, this option is definitely good enough.

Another of the many questions for us to answer throughout this endeavor was, “how powerful should we make the instances in the Luigi worker cluster?”, and this time a solution didn’t reveal itself so easily.

Our first instinct was that it didn’t really matter — maybe we use relatively small instances and put 5 or 10 workers on each, which would allow the cluster to be sized in a more granular way. Or, we could use larger instances instead, which would slightly reduce the frequency of machine failures since we’d have a smaller fleet. Either way, the benefits and downsides didn’t seem very significant.

The real issue can be made clear in the following hypothetical situation:

One Tuesday morning at 8:00AM, there is a large amount of Luigi activity, and so your cluster scales up to meet the demand. At its peak, it has 30 instances, each with 30 active Luigi worker threads, so a total of 900 tasks being executed at once. You’re thrilled. Then, by 9:00AM an hour later, things have calmed down considerably — nearly all of the tasks have finished and there are only 25 running tasks remaining. All of these could theoretically fit on one instance, so you’re hoping to see your autoscaling group drop to nearly it’s minimum size… but, when you check its size in the AWS Console, the news is worse: 25 of the 30 instances are still up. It turns out that for the 25 out of 900 tasks that are still running, each one was started on a different EC2 instance, and so none of them can be scaled down without prematurely killing a task.

It would’ve been interesting to solve this problem by writing some kind of a custom resource allocator, which would be able to “pack” tasks onto machines in a smart way, allowing us the flexibility to use whatever machine size was convenient and maintain a small amount of wasted computer. But, time is always shorter than you’d like it to be, so we settled for a solution a bit less elegant, but every bit as functional: we used very small EC2 instances, with one Luigi worker per instance.

This way, there’s no chance of an in-between state: any worker is either fully utilized, or inactive and can be killed. This way, all we had to do was write a simple cron job to make sure the instance-protection of the instance was set whenever it was actively executing a task, and let the AWS autoscaling group handle the rest. We were off to the races!

So, that’s how we put together an auto-scaling cluster of Luigi workers to better support our growing customer base at ActionIQ. Not all solutions have to be complicated, and in this case we found that a simple, one-worker-per-instance group of workers allowed us to never have to worry about having enough Luigi horsepower again, while simultaneously not spiraling into a 6-month project that prevented us from working on making the platform more robust and capable in other, more exciting ways.

Have you needed to set up a scaling solution for a cluster of workers in a similar way, Luigi or otherwise? What was your solution for this — a simple one like we listed, or a more involved solution like Mesos? Or have you gone with a Luigi alternative, like the more comprehensive Airflow, instead? Leave a comment down below!

Brandon is a backend developer at ActionIQ, where he herds data into buckets and has arguments with stack traces about what that Python function was really up to when it decided to bail at 2019–02–13T18:27:02.264Z. He also enjoys riding bicycles, buying things for his bicycle, and looking at pictures of other people’s bicycles online.

--

--