Using Django with Dask for Task Processing

Matt Nicolls
Moonvision
Published in
5 min readMay 16, 2019

Full Source Code: https://github.com/MoonVision/django-dask-demo

Overview

Web servers require asynchronous task execution for longer running computations. A common solution to this problem is task queues like Celery. As it has become easier to be able to store more data in recent years, data scientists are needing more and more computation power to process all their data. Dask was introduced to help them with this. From the docs, “Dask is a flexible library for parallel computing in Python.” Let’s look at how we can use Dask behind our Django Rest Framework web server to process asynchronous tasks.

Dask Setup

Dask can run on a single machine or a distributed system with thousands of cores. To use in a cluster, it uses a scheduler to coordinate worker nodes in executing your task. This setup allows you to supply a preload script for your worker that will be run before the worker starts. We will use this script to setup the django environment on our workers. At the time of writing you install Dask with the “dask” pip package and the distributed system parts of Dask with the “distributed” pip package. See the official docs for current info.

Setting Up the Cluster and Connecting to the Scheduler

With Dask installed, we now have the dask-scheduler and dask-worker commands. To startup our cluster we will first open a terminal and start a scheduler with dask-scheduler and then in another terminal we can start a worker with:

dask-worker 127.0.0.1:8786 --preload daskworkerinit.py

We are running the worker command from the /demo/ folder of the repository so that the preload daskworkerinit.py path works and 127.0.0.1:8786 is the location of our local scheduler.

To connect to the scheduler in our django app, we will use a class called DaskManager (/demo/daskmanager/daskmanager.py) to keep all of our connection information in one place. The class uses the singleton design pattern so we have just one connection per server. You will also notice this class has a callback function that is called when a task finishes. Our client needs to be kept alive until our task finishes for us to get this callback.

Tracking Tasks and Getting the Result

Before we create and submit our first Dask task, we need to have a way to see the result after it is finished. We use the DaskTask (/demo/daskmanager/models.py) model to store and track every task we create and submit to our cluster. We also setup the view and serializer for this model so we can view the results through our API. We create the DaskTask object in the DaskManager after submitting to the cluster. Dask returns a future with a unique key that we can use to lookup the status at a later time. For viewing tasks in real time, Dask also has a web view into the scheduler using Bokeh and runs on port 8787 by default.

Submitting a Task to the Scheduler

Now that we can see the results of our tasks, let’s look at a small demo task to run on the cluster. The numbersexample app will be our demo. Here we have one model called Number where we just store an integer value. In the view, we have one list route where we will build our Dask graph and then submit to the cluster. In our sample graph we will get all Numbers from the database, square them, and then take the sum of the squares.

First thing is to create numbers in shell:

from numbersexample.models import Number[Number.objects.create(value=n) for n in range(500)]

Then we start a local server with python manage.py runserver and visit the endpoint http://localhost:8000/numbers/square_sum/. Our response looks something like:

{  “url”: “http://localhost:8000/tasks/sum-12f79623-0dd4-412f-b6de-f7d3a2f426ab/",  “task_key”: “sum-12f79623–0dd4–412f-b6de-f7d3a2f426ab”,  “status”: “pending”}

Dask always seems to return pending even when tasks are running unfortunately, but we can see that we now have a url for the task we created and can check the status by going to it. The task should be finished as isn’t something that should take long and will look like:

{  “url”: “http://localhost:8000/tasks/sum-12f79623-0dd4-412f-b6de-f7d3a2f426ab/",  “task_key”: “sum-12f79623–0dd4–412f-b6de-f7d3a2f426ab”,  “status”: “finished”,  “result”: 41541750}

We can see the result of our task has been populated and saved.

Real World Graph

At Moonvision, we do image processing. Let’s look at how we make sprites on the platform.

Our graph look for building this image looks like:

cropped_images = []for entity_id in entity_ids:  image_result = delayed(crop_image_to_entity, pure=True)(entity_id)  cropped_images.append(image_result)sprite_image = delayed(images_to_sprite)(cropped_images)saved_image = delayed(cv2_to_image_bytes)(sprite_image)return saved_image

This real world example follows the same layout as the sample one where we first do computation on a list of things and join the result. For this one, our input is a list of IDs from our database and the output is a base64 byte string. Dask makes this joining and waiting on results easy.

Final Thoughts and Improvements

As mentioned, tasks always have the status of pending until the task is finished. It would be nice if Dask returned a proper status and more detailed information. Also, client connection needs to be held open while task is running. Dask also has a fire and forget function to submit to the cluster but we don’t have a way to save the result then. Dask provides a way to write scheduler plugins to have access to tasks as they finish but the scheduler is a bottleneck of our distributed system so running save tasks would not be advisable. In the future, we would like to forward the results to an AMQP, server such as rabbitMQ, to save results and save task updates. We could use these updates to correctly save the status of the task and have more information of how far along it is.

Overall, Dask allows us to write modular computer vision nodes that can be composed into processing graphs. We haven’t encountered any major roadblocks in this approach and plan on continuing it in the near future. We take advantage of some of the more advanced features of Dask in our cluster such as:

  • Using resources on our tasks to have GPU workers in our cluster and dedicated real time workers for
  • Replicating models into distributed memory for faster real time processing of tasks that require model loading
  • Spawning tasks within tasks

Checkout https://app.moonvision.io to see it in action!

--

--