Building a Parallel Task API with Celery

Zymergen Technology Team
Sep 3 · 12 min read

Motivating Example

Celery Setup

Counting Words with Canvas

documents = ['hi there you', 'there you are', 'please say hi']
expected_result = {'there': 2, 'please': 1, 'say': 1, 'hi': 2, 'are': 1, 'you': 2}
result = count_words(documents).get()

Serial Task with Canvas

from collections import Counter
from celery import shared_task
@shared_task
def count_words(documents):
word_counter = Counter()
for document in documents:
word_counter.update(document.split(' '))
return sum(word_counter, Counter())

Parallel Task with Canvas

from collections import Counter
from celery import shared_task, chain, group
NUMBER_OF_GROUPS = 32
@shared_task
def setup_step(documents):
return [documents[i::NUMBER_OF_GROUPS] for i in xrange(NUMBER_OF_GROUPS)]
@shared_task
def process_step(grouped_documents, group_index):
word_counter = Counter()
for document in grouped_documents[group_index]:
word_counter.update(document.split(' '))
return word_counter
@shared_task
def join_step(word_counters):
# need to convert dicts back to counters because of the serialization round trip
word_counters = [Counter(counter) for counter in word_counters]
return sum(word_counters, Counter())
count_words = chain(setup_step.signature(),
group([process_step.signature(i) for i in range(NUMBER_OF_GROUPS)]),
join_step.signature()
)
group([process_step.signature(group_index=i) for i in range(NUMBER_OF_GROUPS)])

Putting it all together

from celery import chain, group,count_words = chain(setup_step.signature(),
group([process_step.signature(i) for i in range(NUMBER_OF_GROUPS)]),
join_step.signature()
)
documents = ['hi there you', 'there you are', 'please say hi']
result = count_words(documents).get()
expected_result = {'there': 2, 'please': 1, 'say': 1, 'hi': 2, 'are': 1, 'you': 2}
assert result == expected_result

Potential High Memory Usage

The manner in which Celery sends data to a group can have potentially large memory usage. This example of summing a list of numbers illustrates how the entirety of the input data is duplicated for each worker used.

Creating a Simple API

from demoForkJoinTask import ForkJoinTaskclass WordCounterTask(ForkJoinTask):@classmethod
def get_input_schema(cls, *args, **kwargs):
return {'documents': {'type': 'list'},
'delimiter': {'type': 'string', 'default': ' '}}
@WordCounterTask.setup
def demo_setup(documents):
return [{'document': document} for document in documents]
@WordCounterTask.process
def demo_count(document, delimiter=' '):
word_counter = Counter(document.split(delimiter))
return word_counter
@WordCounterTask.join
def demo_join(word_counters):
word_counters = [Counter(counter) for counter in word_counters]
total_counts = sum(word_counters, Counter())
return {'word_count': total_counts}
documents = ['hi there you', 'there you are', 'please say hi']
args = {'documents': documents}
result = WordCounterTask.signature(args)().get()
word_count = result['word_count']
expected_result = {'there': 2, 'please': 1, 'say': 1, 'hi': 2, 'are': 1, 'you': 2}
assert word_count == expected_result
documents = ['hi,there,you', 'there,you,are', 'please,say,hi']
result = WordCounterTask.signature({'documents': documents})().get()
unparsed_count = {'please,say,hi': 1, 'hi,there,you': 1, 'there,you,are': 1}
args = {'documents': documents, 'delimiter': ','}
result = WordCounterTask.signature(args)().get()
expected_count = {'there': 2, 'please': 1, 'say': 1, 'hi': 2, 'are': 1, 'you': 2}

Conclusion

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade