Be One With the Taskflow

Some time ago we decided to rewrite a large chunk of our provisioning system at work. For the unfamiliar, a provisioning system is kind of a nebulous term describing software systems that create and manage resources. In our cases, the resources were clusters of virtual servers running Apache Hadoop and other related software. One of the common themes in provisioning systems is that you have a series of workflows for different processes, like ‘create a cluster’, ‘add a server to a cluster’, ‘delete the cluster’, etc. Each of these is made up of a series of tasks, ‘send API call to cloud vendor to create the server’, ‘keep sending API calls to the cloud vendor until the server is ready’, ‘log in to the server and run this script’, etc. Previously, we were using the Python celery project to manage these tasks, but we were running into limitations with their approach. I should note upfront that I’m a core reviewer on Taskflow now, so I’m likely biased, but I really feel like the overall approach Taskflow takes is superior. It’s also possible celery has matured in the last two years since we switched. I’m going to skim the surface a bit here, but hopefully it at least gives you a basic idea about what Taskflow is all about. But first we must start with the history behind the decision to switch.

Celery

The primary purpose of celery is to asynchronously execute function calls. You add a decorator to your function that says “run this function with celery”, then when you call that function what happens is instead a message is put on a queue (rabbitmq by default, but you can use any backend supported by kombu, we used rabbit).

@app.task
def create_server(node):
# this function will be executed elsewhere, magically

Then a celery worker will consume that message, execute the function, and put the result back on the queue. If you want to execute another task after that task completes, that task must call the other function that is also decorated to be run with celery. This isn’t a bad approach to just firing off background tasks, and if you have a strictly serialized flow of tasks, it works out ok. Our problem was that we wanted some parallelism in parts of the flow, but not in others. Something like this:

for node in nodes:
create_server(node)
wait_for_server_to_finish(node)
configure_cluster()

So we could easily fire off all the create_server() calls in parallel, and they could each call the wait_for_server_to_finish() about themselves next, but when you wanted to switch from a bunch of parallel tasks back to a serialized structure for the whole cluster, there was no real way to do that. What we ended up doing was something like:

@app.task
def configure_cluster():
for node in cluster.nodes:
if node.is_building:
return
# do what we need now because all nodes are finished

I don’t know if you understand race conditions, but this was a huge one. Frequently enough, configure_cluster() would either execute twice or not at all because multiple wait_for_server_to_finish() calls finished simultaneously. We could add some sort of locking mechanism in to prevent the race condition, but it was going to be a serious pain because distributed locks are hard (Apache Zookeeper is the only decent option for this that I know of). We also ran into other issues where the connection to rabbit would time out and the celery worker would just die and we’d never notice that the process just stopped midway unless we monitored how long the cluster was taking to build and then tried to diagnose it (which just involved deleting it and starting over).

What we really needed was a system that thought about workflows in terms of flows, not individual tasks that you can sort of chain together. We did some research, and Taskflow looked like it was the best option available at the time. Since Rackspace is a big proponent of Openstack (having founded it and being the only large public cloud using it), we figured it would be an easy sell to use the Openstack-incubated Taskflow project (it has since grown up to be a real boy).

Taskflow

The way taskflow works is you can define a series of tasks and combine those together into a workflow, which is really just a graph of tasks that you can traverse. Those workflows can then also be composed together to create another bigger workflow. So you can end up with a flow comprised of many subflows, each comprised of many tasks. Each flow can be run in serial (linear_flow) or in parallel (unordered_flow), but the subsequent tasks and/or flows do not have to follow the same structure. It’s a graph that can expand and contract how many edges it has as it goes. How it works under the hood is the subject for a much longer talk or whitepaper, but the general idea is that you define your flow using the built-in classes and functions, then it compiles that down to a directed, acyclic graph of tasks. Each task is an object that inherits from a base Task class and defines both an ‘execute’ and ‘revert’ method. The ‘revert’ method is optional, but called if the task or any subsequent task fails.

class CreateServerTask(task.Task):
def execute(self, node_id):
# send the POST to the cloud provider API
    def revert(self, node_id):
# send a DELETE to the cloud provider API

So that code we wanted to do with celery now becomes:

linear_flow.Flow(
unordered_flow.Flow(
linear_flow.Flow(
CreateServerTask(inject={'node_id': node.id }),
WaitForServerTask(inject={'node_id': node.id }),
) for node in nodes
),
ConfigureClusterTask(),
)

And it does exactly what we wanted. No race conditions. No convoluted “let me check to see if other tasks finished”. There’s a little more forethought that you have to put in to define your flow instead of just “calling functions that happen to execute elsewhere”, but honestly forcing you to think about it is probably a good thing. Viewed as a graph, it would look something like this:

Sorry this looks like crap, medium keeps rejecting the png version for no apparent reason

And since the flows are composable, you can actually move out that inner subflow for creating the server into its own flow, so you can reuse that logic in, say, ‘add a node to an existing cluster’.

def add_node_to_cluster(node):
return linear_flow.Flow(
CreateServerTask(inject={'node_id': node.id }),
WaitForServerTask(inject={'node_id': node.id }),
)
def create_cluster(cluster):
node_flows = [add_node_to_cluster(node)
for node in cluster.nodes]
return linear_flow.Flow(
*node_flows,
ConfigureClusterTask(),
)

Worker-Based Engine

There are a number of ways you can execute your flows in Taskflow. You can just create what’s called an ‘engine’ in memory and tell it to run your flow. You can have that engine run the tasks in a process pool, or a thread pool, or just in the main process (no more parallel). There’s also this concept of the ‘worker-based’ engine. What it does is runs a set of worker processes (or a thread pool, or multiple thread pools), and they each advertise “I am a worker that can runs tasks A, B, and C”. This lets you divide your tasks up, if say, a specific machine needs to run certain tasks because it’s the only one with access to some dependent system that those tasks rely on. We don’t use that feature, but I can see the usefulness of it. Then you run a separate conductor process that compiles the flow into the DAG, runs through it and for each task submits a message to a queue for the workers to consume (similar to celery, it uses kombu for this, which is rabbitmq by default but supports other backends, we still use rabbit but are considering moving to Zookeeper for reasons that will make sense soon).

Job Board

To get flows into the conductor, you use what Taskflow calls the ‘Job Board’. This is a construct where you post some metadata about the flow, its parameters, etc into a data store. Zookeeper is the default choice here, and the best choice because of how ephemeral nodes work, if at any point the conductor crashes or loses its connection to Zookeeper, another conductor process can pick up the job and keep going. The conductor can also use watchers to be notified immediately when new jobs are posted. So as far as our application is concerned, we create a bunch of tasks, combine them together in a flow, then post a job to the job board with the information about the flow generator (a function that returns a flow definition, all Python code). Getting the job to the job board is a little more involved than it should be at this point, but it’s easy to abstract into a function call. You have to create a FlowDetail model, which is what tracks the flow state as it is executed, and put that in a LogBook. We currently use Zookeeper for the model persistence as well, but I’d recommend not doing that because Zookeeper makes a terrible long-term storage option. The sqlalchemy backend pointed at a Postgres database would be a better choice:

zk_client = Kazoo()
persistence = persistence_backends.fetch({
"connection": "zookeeper://",
"path": "/taskflow",
"check_compatible": False,
}, client=zk_client)
jobboard = jobboard_backends.fetch(
'my_jobs',
conf={
"board": "zookeeper",
"check_compatible": False,
},
client=zk_client,
persistence=persistence,
emit_notifications=False,
)
connection = persistence.get_connection()
book = models.LogBook(job_name, logbook_id)
flow_detail = models.FlowDetail(job_name, gen_uuid())
book.add(flow_detail)
connection.save_logbook(book)
job_details = {
"flow_uuid": flow_detail.uuid,
"store": {"cluster_id": cluster.id},
}

engines.save_factory_details(flow_detail, flow_factory,
factory_args=[],
factory_kwargs=store,
backend=persistence)

jobboard.connect()
job = jobboard.post(job_name, book=book, details=job_details)

So that is a bit of a mouthful seeing it all together like that (we have it broken down into a few functions for dealing with the various components). But once that’s done, Taskflow just takes it from there, and you can be sure that it will run the whole thing and keep a log of what it’s done for later diagnosis. If any task fails, it will work its way back up the graph, calling revert on every task that had executed successfully. Of course, that behavior is actually controllable within your flow with retry tasks.

Retries

Taskflow has a concept of a retry task. It’s a special task type that you can put on any flow or subflow to give you some control over what happens when a task fails. It’s stateful, so you can do things like ‘retry this flow X number of times’, or ‘retry this flow until this timeout expires’. The former exists in Taskflow, the latter we wrote and I keep promising to submit back but haven’t yet (it will require some cleanup first to make it more generic). So now, you can do fun stuff like:

linear_flow.Flow(retry=Times(5),
CreateServerTask(node),
WaitForServerTask(node),
)

So only if a particular server fails to build five consecutive times will it fail the overall workflow and revert everything.

There is too much, let me sum up

There’s a lot more goodies in this bag that I haven’t delved into, but I don’t want to get too long-winded. I might do some follow-on deeper dives into specific areas, but this was our basic process in converting our code from celery to taskflow. The initial conversion had some bumps as we got more familiar with Taskflow, as all projects of this sort do, and we submitted a decent number of patches for obscure bugs we ran into. After those were hammered out, we’ve been running smoothly. Our flows no longer just disappear into the ether randomly and leave a cluster stuck in a half-finished state, we get notified if any flow fails entirely so we can proactively look into it for our customers, it’s much easier to add new flows reusing existing tasks, flows automatically resume after outages, we can rerun a failed flow after manual cleanup, we have a flow history and can see which tasks executed and what they returned, etc. Overall, it’s been a fantastic improvement.