Parallel running DAG of tasks in Python’s Celery

Pavlo Osadchyi
5 min readMar 8, 2018

--

Writing your own task scheduler

The problem

Let’s consider a use case where instead of running a series of tasks in Celery, you have a Directed Acyclic Graph of tasks you want to run. My first attempt tackling this problem was using primitives that Celery already had. Not sure if it’s just me, but I failed modelling DAG workflows with chains, groups and chords. I’m not saying it’s not possible, but I couldn’t. So I decided to come up with my own solution for this: the problem itself is fun, so why not, moreover I also couldn’t find something that is already baked on top of Celery and fits my needs.

First, let’s introduce a couple of abstractions, domain entities. These entities are: Workflow and Task. Here is ER diagram:

ER diagram

There are a few important things on this diagram to be broken down:

  • Workflow.dag_adjacency_list — this field is adjacency list that represents a DAG of tasks. Each Task has an assigned id field, and Task is a vertex in our DAG. Adjacency list consists of task ids. Here is an example:
Task(id=1)
Task(id=2)
Task(id=3)
...
Task(id=8)
Workflow(
dag_adjacency_list={
1: [3],
2: [4],
3: [5],
4: [5],
5: [6, 7],
6: [8],
7: [8]
}
)

It represents the next DAG (all edges are directed from left to right):

(1) - (3)       (6)
\ / \
(5) (8)
/ \ /
(2) - (4) (7)

Our problem to solve is how to parallel traverse this DAG in Celery running each task. Why specifically parallel ? Because traversing concurrently is more performant than traversing one graph vertex at a time! More about this is in the next paragraph.

  • Task.sleep_time — in real life you would have some metadata attached to the task which describes the task itself (run a shell script, docker container whatever), but in our case for simplicity each task’s job is going to be just some amout of time to sleep — that’s the value of this field.

Workflow is a DAG which has vertices that are Tasks.

Basically we can model really complex processes using those entities and DAGs in general. The graph above, let’s imagine that we are creating multi-project CI process and corresponding nodes are steps of the CI:

  • node 1 run tests for front end project
  • node 2 run tests for back end project
  • node 3 build front end project
  • node 4 build back end project
  • node 5 wrap both builds into a docker container
  • node 6 deploy container to staging-1 environment
  • node 7 deploy container to staging-2 environment
  • node 8 finish

An important feature here is that we can’t deploy a docker container before we are sure that tests are passing and sub-projects have their respective build bundles assembled. In fact, running this CI means traversing DAG and executing each task in it.

Parallel graph traverse

Given the DAG from above again:

Adjacency list1: [3]
2: [4]
3: [5]
4: [5]
5: [6, 7]
6: [8]
7: [8]

it’s parallel walk through may look like this:

Alt text

We have two worker threads: Yellow and Blue pins. DAG has two entry points: 1 and 2. So we can start traversing those two branches simultaneously. Our parallel traverse algorithm has an important feature:

Every next node is being processed if and only if all other nodes that have path to that node are already processed too

In our case this feature is greatly illustrated with node 5. Yellow worker discovers that node first, but it waits for the blue worker to catch up and get to the node 5 before going any further. When blue worker finally gets to node 5, it’s processed, and workers start traversing in parallel again. Same happens with the last node, node 8: yellow worker was faster this time too and it waits for blue worker. Once blue worker is there, node 8 is processed and the whole traverse ends.

There was one simplicity made breaking down parallel graph traverse: I said that yellow worker waits until blue worker gets to node 5 too, but in practice it won’t just *sit there and wait *(because such strategy is waste of resources), it will be rather returned to the pool of workers and be ready to pick up a new node.

Celery

Celery uses a queue for scheduling it’s jobs, and there is a pool of workers to process them. Workers sit and wait until something pops up in the queue to process it.

After a node is processed, worker:

  1. pushes all it’s adjacent nodes to the queue for further processing and
  2. returns back to the pool

When worker visits a node, it first checks our main condition

if all other nodes that have path to that node are already processed too

If it’s not fullfilled, worker just returns to the pool.

Given more complex graph this time:

Adjacency list1: [2, 3]
2: [4]
3: [5]
4: [6]
5: [6]
6: [7, 8, 9]
7: [10]
8: [11]
9: [12]
10: [13]
11: [13]
12: [14]
13: [14]

This animation illustrates it’s traversal from the beginning to the end:

Alt text

Let’s break it down (step number is blue bottom-centered number on the gif):

The process continues till node 14 is processed...

Implementation

Celery handles most of the boilerplate for us: we don’t have to care about “pool of workers”, neither about the queue to schedule the work. Basically, whenever you call Celery task, it places that task onto the queue and a worker from pool picks it up. So we just recursively call the task to process every next node in our graph.

This is the core. For operations with graphs I used Python’s networkx library. (that’s where those methods like Graph#successors are coming from). There are a couple of references to private methods and some setup done for the code above to work, but for brevity and to keep things clean here I created a sample project where you can see things in more detail and run a working example.

One more thing that’s worth mentioning: we use Celery’s capabilities to retrieve status of a task (AsyncResult API). Every our highlevel Task entity has a reference to celery's task, so each Task when we check it's status refers to the underlying CeleryTask. And again, we are checking statuses of "previous" tasks when we want to make sure the current task (node) is ready to be processed.

Conclusion

Graph is a really powerful abstraction. Graphs are formalized, if your problem can be narrowed down to dealing with graphs, most likely solution is going to be clean and concise. Every major platform has at least a couple of graph libraries available. And Celery in it’s turn provides a solid API to concurrently and distributively run Python code. Using both these tools in conjunction made our problem of effectively running complex workflows almost a breeze to implement.

--

--

Pavlo Osadchyi

Software developer. Pragmatic approach advocate. Web front end and back end. Husband and father. Wanderlust.