Loosely coupled tasks in Luigi workflows
*** PLEASE NOTE: This article is getting outdated: The presented ideas have now matured and eventually resulted in the Scientific Luigi library ***
Originally posted at bionics.it on Aug 5, 2014.
Some discussion on this post has happaned in this thread.
In my work in the Pharmaceutical Bioinformatics group at Uppsala University, we are heavy users of Spotifys luigi workflow system, for automating pharmaceutical bioinformatics workloads at the compute cluster of UPPMAX super computing center.
I have blogged before about how we figured out a way to make workflows written in luigi more dynamic and allowing the dependency graph definition to reside outside of the actual tasks, by creating a script that builds up the dependency graph dynamically, sets up luigi workers, and then executes a requested task.
After looking closer at the intended way to re-use luigi tasks in other workflows in a discussion in the mailing list (using subclassing) we realized this sub-classing way had some strong benefits, since the task classes are executed independently, which lets us use luigi’s automatically generated command line interface when running tasks (luigi generates all the commandline flags for the fields of a task, for you), so that one can avoid a lot of boilerplate just for getting the commandline-handling and other infrastructure wiring done.
Recap of the recommended way (subclassing)
So, in short, the “recommended way” of defining a workflow, would go something like this: You have two classes with a defined dependency between them already:
Then, if you want to reuse TaskB in another workflow, using another upstream task, you would subclass TaskB and override it’s requires method, like so:
This is well and fine for the most cases. Now we can again run “MyOwnTaskB” as usual, using luigis automatic command line interface (we don’t need to write our own), just as we could with TaskB previously.
Remaining problem 1: Duplication of parameter definitions
It turned out that we were still facing issues though. The fact that the dependency graph is implicitly defined in the requires()functions of each task in the workflow, and that the workflow is always started by calling the last (most downstream) task in the workflow, has some implications for how parameters are passed between tasks in the workflow.
It turned out that whenever a task in a workflow introduces a new parameter, that parameter had to be duplicated in all downstream tasks in the whole workflow (and passed on between all those task, up to the task that uses the parameter), which bascially means that changes in one task is not independent from the rest of the workflow. Thus, this task will not work equally well in another workflow (with other substream tasks), since those tasks would also have to incorporate the new parameter in order for it to be passed along from the last task in the chaing (the one that is called first upon execution).
See this code example for a very sketchy example of the effect (notice how the task classes are growing and growing, down the workflow chain):
In real workflows, the problem can grow quite problematic. See for example this screenshot of a real world workflow of ours, using this technique, and again notice how the parameter lists grow in size down the workflow chain:
Proposed solutions to problem 1
There are some proposed solutions to this. Joe Ennever shared a very nice pattern in the mailing list, using a mixin that contains the parameters needed, and a method to automatically retrieve that list of parameters as a dictionary, which can then be sent to the kwargs argument (a special parameter in python, which takes parameter names and their values from a dictionary) of upstream tasks.
We realized a number of problems with this approach though.
Firstly, sending unnecessary parameters (which are not used) to a task, might make that task look like a unique task, although there might be another task giving the same file output, since those surplus parameters might not effect the output of them. Thus, there is a risk of tasks stepping over each others toes.
To get around that, one would need to set up multiple “sets” of parameters, a new one for each time a new parameter is introduced somewhere up the chain of the workflow. This gets unmanageable rather quickly though, as we have realized, and really slows down the construction of workflows enough to make it unusable.
Problem 2: task specific treatment of multiple outputs from upstream tasks
Another problem we realized is about routing multiple outputs and inputs between tasks. The problem, put very shortly, is the following:
Luigi dependency graph, by default, only represents relations between tasks,
while what we actually need, is the dependencies between inputs and outputs, of each task.
We have the need for such constructs all the time in our use case. The way we tried to implement something to support this in luigi was:
- For multiple outputs, just return a dict with named luigi targets from the output() function (to generate multiple outputs).
- For multiple inputs, return a dict from the requires() function, with named “inputs” (that could be any specific output target, of the upstream targets).
This approach turned out to quickly become quite messy though. See for example this schematic example code (see especially in the run() method of TaskC, how we need to traverse a 2 step nested dict structure, and the worst of it, this is internal details of an upstream task, that is now “polluting” the run method of this task!):
All in all, this approach turns out to have a number of issues:
- The requires() function of a downstream task has to know specific internals of upstream tasks(the name of outputs in the output dict).
- When using the subclassing approach for overriding dependency graphs, one need to remember details which might not even be available in the same file (might be stored in the original class library, of which we are overriding task classes).
- As you could see above, the code is not very readable.
The solution
We had basically ran into a stop with our workflow work. We had no longer the slightest of how things were working anymore, the strangest of bugs were appearing everywhere, and we had not idea about how to go about it. We were even ready to drop luigi for good and go with whatever alternative would get the job done, even if that would be makefiles or bash scripts.
Finally we sat down and took a last very hard look and thinkthrough of what we already learned, what we had learned from other fields such as flow based programming, and whether these lessons could somehow be combined into something usable, that could be implemented in luigi in a
reasonble way.
Bingo. We came up with two ideas, one kind of old and one new, that when used together, very neatly solves basically all the problems we had faced before:
- Use the way I blogged about before , of constructing the workflow by instantiating objects and injecting each tasks upstreams via parameters, rather than specifying upstreams in each task’s requires() function.
- BUT, don’t send instantiated objects as parameters, instead send a special dict structure, on containing information about which task, and which target of that task, we should send to this input.
- Implement a special method (stored in a meta class) that is used to retrieve incoming targets, which will parse the above mentioned dict structure and return the target in question.
- The above mentioned dependency graph construction can actually be done completely inside the requires() function of a luigi task, which means we can encapsulate the whole dependency graph construction for a workflow inside another luigi task!
Taken together, this means:
- We can do ALL the wiring between inputs and outputs of tasks, completely outside of the luigi tasks themselves.
- We can wire each tasks individual output “port”, to each tasks individual input “port” (to borrow lingo from flow based programming).
- We can encapsulate parts of a workflow into a separate luigi component, making the workflow easier to manage on the most coarse-grained level.
So, let’s look at a code example on how this looks:
So, what are we doing here?
What the code does, is that when we pass the special dictionary structure, on the form { ‘upstream’ : { ‘task’ : <sometask>, ‘port’: <nameofport> } }, then we use the get_input() function inside a task, with the name of that parameter. The get_input() function will then parse the special dict structure and return the specific target we pointed out (not just the task).
This means that the actual connectivity info between in and out targets is located outside of the task classes themselves. This means that inside each task, we just need to know the name of the input parameter, in order to retrieve the content (the target) connected to it.
But, now is the question, how can we do this better?
Some discussion on this post has happaned in this thread