Building a Parallel Task API with Celery

An introduction to running parallel tasks with Celery, plus how and why we built an API on top of Celery’s Canvas task primitives

One of the technology goals of Zymergen is to empower biologists to explore genetic edits of microbes in a high throughput and highly automated manner. The Computational Biology team at Zymergen is responsible for building software to help scientists design and execute these genetic edits. (For a brief overview, see our Zymergen 101 tutorial).

Our team’s workflows are written in Python and run asynchronously using Celery. To improve the performance of these workflows, we have developed a simple API for writing parallel application code using Celery. In this post, we will first walk through how to build a simple parallel workflow using Canvas, a module of task workflow primitives that ships with Celery. Then we will step through how we abstracted away the specifics of Canvas to build a simplified parallel task API for our use cases.

Motivating Example

The Computational Biology team develops several workflows for designing and executing large sets of genetic edits in Python. These workflows are executed via an internal task-running service, ZWork, built using Celery. One of these workflows, known as Campaign Design, involves reading and writing potentially large microbial genomic data stored in our database and performing sequence alignment between individual microbe genomic sequences and the sequence of a proposed edit. The design of a single genetic edit takes from one to four minutes, and a single Campaign Design includes 500–700 individual genetic edits, leading to potential run times measured in days.

These long run times were a bottleneck for Zymergen scientists. The Computational Biology team first investigated the serial process itself for efficiency gains; a few improvements were identified. However, because each design is fully independent and the goal is to design many changes at one time, we believed that parallelization would be a major performance win. Plus, parallelization could be applied to other workflows too: many other Computational Biology workflows are computationally intensive but for the most part are “embarrassingly parallel.” In fact, most of these workflows can be handled in a simple fork-join pattern, where a set of inputs is distributed to independent workers for computation.

There were two major steps to parallelizing each workflow (referred to in Celery as a “task”). The first was to refactor the task code itself to have clear boundaries around the work to be run in parallel. The second was to develop infrastructure to execute this refactored code. ZWork’s Celery infrastructure was well supported and battle-tested at this point. There was a desire to avoid jumping to a new technology, if possible. However, it was unclear how Celery would handle the load of the parallelized process. Promising results during a Hack Week project suggested that Celery was up for it.

Given that the bulk of the work in moving to parallelized workflows was in refactoring the code, and that there was still uncertainty about how Celery would perform in the long term, we decided to create a simplified API on top of Celery that would let our software engineers focus on the logic of their workflows, and develop parallelized workflows without needing to know about the internals of Celery. This would also allow us to change backends in the future, while keeping the application logic intact. Finally, it would provide an opportunity to add some small flourishes to the central task API, such as standardized input validation.

Skipping ahead to the results, post-parallelization we now see aggregate run times of five to fifteen seconds per design rather than one to four minutes. Leveraging Canvas via our API lets us achieve a greater degree of parallelism and use our existing infrastructure to bring more compute power to the problem.

The following sections walk through our Celery setup and how to build first a serial and then a parallel task using Canvas. This recipe is all that is needed to convert an existing Celery task into a set of Canvas primitives to form a “fork-join” task to distribute computation across multiple Celery workers. After this walk through, we point out a memory usage limitation of using Celery with large datasets. If you are familiar with Canvas for parallel tasks, you can skip directly to the API abstraction we designed.

Celery Setup

Much of our technology stack is built upon open source Python bioinformatics and web tools (BioPython, Numpy, Django, Celery, Airflow, and many more). We use Celery to create a flexible task runner (ZWork) for these tasks. In the early days of Zymergen, as a small start up with the need for running a queue of asynchronous tasks, Celery was a natural fit. As the company has grown, we have added other technologies for tackling distributed work (AWS Lambda, AWS Batch, etc.). However, Celery is a still a great resource. It works well for our tasks with run times that exceed the time limits of Lambda. Its low startup overhead, compared with Batch, allows our Celery tasks to begin as soon as users submit them. We have a variety of tasks that run against our ZWork cluster.

Celery has shipped with Canvas since version 3.0 for designing complex task workflows. Canvas provides a few primitives (group, chain, partial, etc.) for composing a workflow out of existing Celery tasks. Canvas was well-suited to the type of tasks the Computational Biology team needed to immediately parallelize.

The code examples below assume you have existing Celery workers. For details on getting started with Celery, see: http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html

Counting Words with Canvas

Here is a simplified example of counting words. We want to pass in a list of documents and get back a dictionary of words to total usage. We will assume each document is a list of words separated by a single space.

documents = ['hi there you', 'there you are', 'please say hi']
expected_result = {'there': 2, 'please': 1, 'say': 1, 'hi': 2, 'are': 1, 'you': 2}

The goal is to create a Celery signature that can be called as follows:

result = count_words(documents).get()

Serial Task with Canvas

A standard serial version of this looks like:

from collections import Counter
from celery import shared_task
@shared_task
def count_words(documents):
word_counter = Counter()
for document in documents:
word_counter.update(document.split(' '))
return sum(word_counter, Counter())

A single Python function is created and decorated with the shared_task decorator. This registers the function as a Celery signature.

Parallel Task with Canvas

The parallel version of this looks like:

from collections import Counter
from celery import shared_task, chain, group
NUMBER_OF_GROUPS = 32
@shared_task
def setup_step(documents):
return [documents[i::NUMBER_OF_GROUPS] for i in xrange(NUMBER_OF_GROUPS)]
@shared_task
def process_step(grouped_documents, group_index):
word_counter = Counter()
for document in grouped_documents[group_index]:
word_counter.update(document.split(' '))
return word_counter
@shared_task
def join_step(word_counters):
# need to convert dicts back to counters because of the serialization round trip
word_counters = [Counter(counter) for counter in word_counters]
return sum(word_counters, Counter())
count_words = chain(setup_step.signature(),
group([process_step.signature(i) for i in range(NUMBER_OF_GROUPS)]),
join_step.signature()
)

The parallel code has the same fundamental function as the serial code. The additional overhead provides a means to distribute the word counting step to multiple distinct workers, specified with the NUMBER_OF_GROUPS constant. The setup_step prepares the documents for independent counting. The process_step does the counting of a subset of documents, and then the join_step combines the counts. Let’s walk through each of these functions in order.

setup_step

The setup takes list of inputs and converts it to a nested list of NUMBER_OF_GROUPS lists (including possible empty lists).

The first thing to jump out might be the NUMBER_OF_GROUPS constant. A Celery group must be declared statically at the start of the task — it is not possible to dynamically create the number of entries here. This means you must decide up front how many pieces to break the work into.

We went with a value that was twice the number of dedicated parallel Celery workers. If there are fewer inputs than that value, sending a no-op value (empty list) to the process step works well. You waste a process but it’s pretty short. For large tasks this value can lead to uneven completion times and non-optimal usage of the worker pool, but it was a reasonable default for us. As we look towards dynamic scaling of Celery workers, this may change.

process_step

The logic in the process step looks identical to the serial version, except for the addition of an index.

Before looking at the logic of the process_step function, let’s look at the creation of the group signature:

group([process_step.signature(group_index=i) for i in range(NUMBER_OF_GROUPS)])

The group_index we pass into the process_step function is important to ensure that each worker operates on only a portion of the total data. The reasons for this, and a potential negative side effect, are discussed below. The process_step then performs an operation on a slice of the overall data.

join_step

The group primitive collects all of the results from each instance of the process_step being run and combines the results into a single list, which is then passed to the final entry in the chain, the join_step task.

The first line of the join_step is recreating the Counter objects. The Counter object created in the process step is converted to a regular Python dictionary when Celery performs JSON serialization/deserialization as part of the distribution of data to tasks. This is an important reminder that every input and output of your task must be serializable. If you choose to use JSON for this serialization, you need to account for any mutation that may occur from a round trip. Other serialization methods are available, including pickle.

Putting it all together

The final Celery primitive needed is the chain. This links together the setup_step, the group of process_steps, and the join_step into a single callable object:

from celery import chain, group,count_words = chain(setup_step.signature(),
group([process_step.signature(i) for i in range(NUMBER_OF_GROUPS)]),
join_step.signature()
)

Calling this looks like:

documents = ['hi there you', 'there you are', 'please say hi']
result = count_words(documents).get()
expected_result = {'there': 2, 'please': 1, 'say': 1, 'hi': 2, 'are': 1, 'you': 2}
assert result == expected_result

Potential High Memory Usage

One thing we learned during the development of our first parallel task was that the manner in which Celery sends data to a group can have potentially large memory usage. Each task in a group receives the same input data — the entire list of lists of data — and then works on its particular slice. In order to have a single process step work on a subset of the data, we introduced the index value using the NUMBER_OF_GROUPS constant. We also created a partial function with the group_index value set. This partial function is registered as a Celery signature, which essentially puts it into a job queue awaiting the remainder of its input from the setup_step.

To restate this point, the entirety of the data needing to be processed by the individual workers is duplicated in direct proportion to the chunking of the data.

The following diagram of a different example task (summing a list of numbers) illustrates how the data replication occurs:

The manner in which Celery sends data to a group can have potentially large memory usage. This example of summing a list of numbers illustrates how the entirety of the input data is duplicated for each worker used.

This can be massively inefficient. If you are working with big data, don’t do this! Celery is likely not going to work. If you have reasonably-sized data (or means of making the inputs to the process_step reasonably-sized) that requires long running computation this will still work — and this was exactly our use case. For many of our tasks, the inputs from users are object ids. Our processing step passes those ids to each individual parallel worker, which then fetches the actual objects it needs from the database.

However, we have one parallel task that breaks apart a large nested data structure into Python dictionaries during the setup step. Additionally, recent changes in how Zymergen scientists use this task have meant that this already large data structure has gotten nearly an order of magnitude larger. During testing of this task with such a large input, RabbitMQ, which we use for queuing, ran out of memory. We are considering other options for this particular task, such as persisting the sub-objects into our database or some other cache.

Creating a Simple API

With the limitations noted, using Celery/Canvas can work quite well for certain tasks. We saw speed improvements of an order of magnitude: from one to four minutes to five to fifteen seconds.

After proving that we could use Canvas to build the types of tasks needed, we wanted to build a simplified API that would allow new task developers to focus on the business logic of their task and not think about the underlying framework. This decoupling of business logic from the specifics of Canvas allows for a potential move away from Celery as a backend to be completed using changes to our API implementation, without any changes to existing tasks.

Our API also hides some of the limitations of Canvas. For example, as someone implementing business logic, you are no longer aware of exactly how inputs are sliced and diced and distributed. You write setup logic that splits the inputs into discrete logical pieces with argument names that map to the inputs of your processing step. The processing step is written to operate on a single piece of data, simplifying the logic. This is both a benefit and a cost. Without knowing that the data returned by the setup step is duplicated in memory several fold, you may not plan accordingly.

In addition to the separation between the task logic and its execution, our API provides two additional features. Both help simplify task inputs:

  • Our task-running service, ZWork, uses JSON as the input to our tasks. JSON schema validation via Cerberus is built into the API.
  • Handling metadata inputs is simplified. Examples of our metadata include parameters for how the data is processed, or a name to give to the final data object created. Rather than passing these values through the business logic of the setup or process steps when they may not be needed, they are partially bound to the methods that will need them.

Here is the same example task of counting words recreated with our API:

from demoForkJoinTask import ForkJoinTaskclass WordCounterTask(ForkJoinTask):@classmethod
def get_input_schema(cls, *args, **kwargs):
return {'documents': {'type': 'list'},
'delimiter': {'type': 'string', 'default': ' '}}
@WordCounterTask.setup
def demo_setup(documents):
return [{'document': document} for document in documents]
@WordCounterTask.process
def demo_count(document, delimiter=' '):
word_counter = Counter(document.split(delimiter))
return word_counter
@WordCounterTask.join
def demo_join(word_counters):
word_counters = [Counter(counter) for counter in word_counters]
total_counts = sum(word_counters, Counter())
return {'word_count': total_counts}

This can be called as:

documents = ['hi there you', 'there you are', 'please say hi']
args = {'documents': documents}
result = WordCounterTask.signature(args)().get()
word_count = result['word_count']
expected_result = {'there': 2, 'please': 1, 'say': 1, 'hi': 2, 'are': 1, 'you': 2}
assert word_count == expected_result

The API allows declaring function arguments for the process and join that accept input directly from user input. This requires a bit of behind the scenes juggling but simplifies life for an application developer.

As an example, the WordCounterTask allows declaring a document delimiter. So instead of this:

documents = ['hi,there,you', 'there,you,are', 'please,say,hi']
result = WordCounterTask.signature({'documents': documents})().get()
unparsed_count = {'please,say,hi': 1, 'hi,there,you': 1, 'there,you,are': 1}

You can get this:

args = {'documents': documents, 'delimiter': ','}
result = WordCounterTask.signature(args)().get()
expected_count = {'there': 2, 'please': 1, 'say': 1, 'hi': 2, 'are': 1, 'you': 2}

The full code can be found here.

Conclusion

Software engineers at Zymergen build tools to empower scientists to harness biology by means impossible in traditional lab settings. In doing so, we strive to balance meeting the needs of today with the expected needs of the future. Our Celery-based microbe design workflow is a perfect example of this. The serial version of this job was slow, but acceptable for then-current use cases. When our scientists onboarded a microbe with a significantly larger genome, the execution time nearly quadrupled. Given that a complete design involves 500–700 individual genetic edits, waiting days for a job to complete was not good enough.

The decision to use Celery/Canvas as the backend for this task was mostly pragmatic: Celery was the infrastructure already in place. There were other technologies that we could have employed (AWS Batch or Lambda have both been used internally for other projects). However, rather than abandon the existing Celery infrastructure, we developed a parallelized solution directly on top of Celery. And it worked! Running the parallelized task with 32 Celery workers, the compute time for the large microbe was 27 seconds, vs 235 seconds on average. This is a 10x speed up with 32x cores involved. There is significant overhead involved with increased serialization/deserialization and database calls, but overall it’s a big improvement that has been well received by the end users, and throwing cores at the problem is acceptably cheap for problems of this size.

The creation of a simple, non-Celery specific API on top of our existing Celery infrastructure allowed small incremental improvements, such as support for end-to-end testing without invoking Celery, to be added and directly applied to all tasks. Standardizing on our API has also ensured that each task is written in a consistent pattern. The demands of future workloads are likely to be different; upcoming problems may not be manageable by Celery and may require an infrastructure change. The Celery-related memory usage issues mentioned may be that forcing function, or we may run into some other unforeseen issue first. Ideally our API — which we designed to allow us to keep the existing infrastructure in place, while providing a migration path that doesn’t require rewriting user code in the future — will pay dividends.

Matthew Batterton is a Software Engineering Manager in the Computational Biology team at Zymergen.

More From Medium

Also tagged Parallel Computing

Also tagged Parallel Computing

Guide to improve Python performance

57

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade