Be One With the Taskflow

Some time ago we decided to rewrite a large chunk of our provisioning system at work. For the unfamiliar, a provisioning system is kind of a nebulous term describing software systems that create and manage resources. In our cases, the resources were clusters of virtual servers running Apache Hadoop and other related software. One of the common themes in provisioning systems is that you have a series of workflows for different processes, like ‘create a cluster’, ‘add a server to a cluster’, ‘delete the cluster’, etc. Each of these is made up of a series of tasks, ‘send API call to cloud vendor to create the server’, ‘keep sending API calls to the cloud vendor until the server is ready’, ‘log in to the server and run this script’, etc. Previously, we were using the Python celery project to manage these tasks, but we were running into limitations with their approach. I should note upfront that I’m a core reviewer on Taskflow now, so I’m likely biased, but I really feel like the overall approach Taskflow takes is superior. It’s also possible celery has matured in the last two years since we switched. I’m going to skim the surface a bit here, but hopefully it at least gives you a basic idea about what Taskflow is all about. But first we must start with the history behind the decision to switch.

Celery

@app.task
def create_server(node):
# this function will be executed elsewhere, magically

Then a celery worker will consume that message, execute the function, and put the result back on the queue. If you want to execute another task after that task completes, that task must call the other function that is also decorated to be run with celery. This isn’t a bad approach to just firing off background tasks, and if you have a strictly serialized flow of tasks, it works out ok. Our problem was that we wanted some parallelism in parts of the flow, but not in others. Something like this:

for node in nodes:
create_server(node)
wait_for_server_to_finish(node)
configure_cluster()

So we could easily fire off all the create_server() calls in parallel, and they could each call the wait_for_server_to_finish() about themselves next, but when you wanted to switch from a bunch of parallel tasks back to a serialized structure for the whole cluster, there was no real way to do that. What we ended up doing was something like:

@app.task
def configure_cluster():
for node in cluster.nodes:
if node.is_building:
return
# do what we need now because all nodes are finished

I don’t know if you understand race conditions, but this was a huge one. Frequently enough, configure_cluster() would either execute twice or not at all because multiple wait_for_server_to_finish() calls finished simultaneously. We could add some sort of locking mechanism in to prevent the race condition, but it was going to be a serious pain because distributed locks are hard (Apache Zookeeper is the only decent option for this that I know of). We also ran into other issues where the connection to rabbit would time out and the celery worker would just die and we’d never notice that the process just stopped midway unless we monitored how long the cluster was taking to build and then tried to diagnose it (which just involved deleting it and starting over).

What we really needed was a system that thought about workflows in terms of flows, not individual tasks that you can sort of chain together. We did some research, and Taskflow looked like it was the best option available at the time. Since Rackspace is a big proponent of Openstack (having founded it and being the only large public cloud using it), we figured it would be an easy sell to use the Openstack-incubated Taskflow project (it has since grown up to be a real boy).

Taskflow

class CreateServerTask(task.Task):
def execute(self, node_id):
# send the POST to the cloud provider API
def revert(self, node_id):
# send a DELETE to the cloud provider API

So that code we wanted to do with celery now becomes:

linear_flow.Flow(
unordered_flow.Flow(
linear_flow.Flow(
CreateServerTask(inject={'node_id': node.id }),
WaitForServerTask(inject={'node_id': node.id }),
) for node in nodes
),
ConfigureClusterTask(),
)

And it does exactly what we wanted. No race conditions. No convoluted “let me check to see if other tasks finished”. There’s a little more forethought that you have to put in to define your flow instead of just “calling functions that happen to execute elsewhere”, but honestly forcing you to think about it is probably a good thing. Viewed as a graph, it would look something like this:

Sorry this looks like crap, medium keeps rejecting the png version for no apparent reason

And since the flows are composable, you can actually move out that inner subflow for creating the server into its own flow, so you can reuse that logic in, say, ‘add a node to an existing cluster’.

def add_node_to_cluster(node):
return linear_flow.Flow(
CreateServerTask(inject={'node_id': node.id }),
WaitForServerTask(inject={'node_id': node.id }),
)
def create_cluster(cluster):
node_flows = [add_node_to_cluster(node)
for node in cluster.nodes]
return linear_flow.Flow(
*node_flows,
ConfigureClusterTask(),
)

Worker-Based Engine

Job Board

zk_client = Kazoo()persistence = persistence_backends.fetch({
"connection": "zookeeper://",
"path": "/taskflow",
"check_compatible": False,
}, client=zk_client)
jobboard = jobboard_backends.fetch(
'my_jobs',
conf={
"board": "zookeeper",
"check_compatible": False,
},
client=zk_client,
persistence=persistence,
emit_notifications=False,
)
connection = persistence.get_connection()
book = models.LogBook(job_name, logbook_id)
flow_detail = models.FlowDetail(job_name, gen_uuid())
book.add(flow_detail)
connection.save_logbook(book)
job_details = {
"flow_uuid": flow_detail.uuid,
"store": {"cluster_id": cluster.id},
}

engines.save_factory_details(flow_detail, flow_factory,
factory_args=[],
factory_kwargs=store,
backend=persistence)

jobboard.connect()
job = jobboard.post(job_name, book=book, details=job_details)

So that is a bit of a mouthful seeing it all together like that (we have it broken down into a few functions for dealing with the various components). But once that’s done, Taskflow just takes it from there, and you can be sure that it will run the whole thing and keep a log of what it’s done for later diagnosis. If any task fails, it will work its way back up the graph, calling revert on every task that had executed successfully. Of course, that behavior is actually controllable within your flow with retry tasks.

Retries

linear_flow.Flow(retry=Times(5),
CreateServerTask(node),
WaitForServerTask(node),
)

So only if a particular server fails to build five consecutive times will it fail the overall workflow and revert everything.

There is too much, let me sum up

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store