Dask: Parallelize Everything

Speed up your big data pipeline in 10 minutes

Haoran Chen
SFU Professional Computer Science
12 min readFeb 4, 2020

--

By Nan Xu, Weijia Li, Haoran Chen

This blog is written and maintained by students in the Professional Master’s Program in the School of Computing Science at Simon Fraser University as part of their course credit. To learn more about this unique program, please visit {sfu.ca/computing/pmp}.

Sledding in parallel

Dask is a one-stop-shop for general big data processing. Whether you are a Python developer looking to speed up existing codebases, a data scientist aiming to extract insight from complex data, or a microbiologist who wants to analyze terabytes of images, Dask got you covered.

Built from the ground up in Python, Dask is truly the only one of its kind. Since it is co-developed with Pandas, scikit-learn and Jupyter teams, it offers many things its competitor PySpark does not have. With Dask, Python developers no longer need to read complicated Java error messages, constantly switch between different syntax, or rewrite the entire codebase to benefit from distributed computing.

Dask also simplifies the big data workflow. Its excellent single-machine performance speeds up the prototyping stage, and leads to faster model deployment. For anyone with experience in Pandas, NumPy or SciPy, parallelizing existing workflow using Dask is painless and only requires small changes. Dask provides the easiest way to deploy not only statistic data analysis, but also machine learning and imaging processing pipeline on clusters.

Dask: Parallelized Python Programming

What is Dask?

A flexible library for parallel computing in Python.

Dask is a Python library leveraging task scheduling for computational problems. Dask provides the most widely-used data structures inherited from Pandas and Numpy, as well as basic parallel computing interfaces based on its self-developed task scheduling system, in order to make large-scale data computing happen.

Why choose Dask?

Code in Python, compute in parallel.

As a data scientist or machine learning engineer, you might face several challenges during your project:

  1. The dataset is extremely large, causing your computer out of memory.
  2. You are expecting to switch between a cluster and your home workstation.
  3. Multi-processing or multi-thread calculation is what you always dream of.
  4. There are other frameworks available, but their APIs are different from what you regularly use.
  5. Understanding the task order of computation is necessary.
  6. You want Python for everything in your project.

Dask solves ALL OF THESE for you!

When using Dask, you can use all your familiar Python libraries and toolkits, and make the computation running in parallel. Dask customizes some of your most-used data structures to fit large datasets requirements, and supports computing and processing everything in a scheduled order. Whether you are using a laptop, or you own a cluster of 1,000 CPUs, Dask works with the exact same strategy, offering high accuracy and robustness.

Lazy evaluation, a method for minimizing the work done for computing, is used to schedule and optimize all the tasks in the program before getting the final result.

The entire task graph can be easily visualized with one line of code, and it helps you to figure out where to optimize your process from the start to the end.

How to use Dask?

Do whatever you do with Python.

Dask can be installed with either conda or pip.

  • The two most important functions in Dask

To understand and run Dask code, the first two functions you need to know are .visualize() and .compute().

.visualize() provides the visualization of the task graph, a graph of Python functions and the relationships between each other. Based on these dependencies, the task scheduler in Dask determines how to run functions in parallel. Parameter rankdir="LR" is helpful if the graph is expected to be viewed from left to right.

Dask uses lazy evaluation strategy, so the program only computes the results after .compute() is called. To avoid calling .compute() multiple times to get results in collections, there is also a .compute() function taking multiple collections and returning multiple results.

  • High-Level Collections
  1. Array

dask.array splits a large array into small blocks of ndarray .

A quick example of visualizing this is to create a 2D array of 100,000 * 100,000 numbers with 10,000 chunks of size 1000 * 1000.

The huge array is split into small chunks to minimize RAM usage

The original array is about 80GB! It could be extremely difficult for nparray to handle this on most personal computers. But as we see, it is only 8MB for each chunk, which is much smaller and easier to process.

Most functions you would like to call on nparray are supported here, for example:

2. Dataframe

dask.dataframe is implemented based on pandas.dataframe, combining a number of Pandas dataframes by index into a huge dataframe.

A Dask Dataframe containing several Pandas Dataframes ordered by date

The functions of dask.dataframe is a subset copied from pandas:

3. Bag

dask.bag is implemented from python.list, which is designed for simple parallel computing for unstructured or semi-structured datasets, like text files and JSON objects.

You can make function calls like what you do with pyspark.rdd or pytoolz:

  • Low-Level Interface

Sometimes you may want to parallelize your algorithm on some small tasks, but Array, DataFrame or Bag are not sufficient to use, or you are aiming to construct some functions by yourself. Then it is the time to use dask.delayed(). It is much simpler to use .delayed() for parallel programming, which is only calling dask.delayed(func)(parameters).

dask.delayed() works pretty well with loops, for example:

Task Graph of a scheduled loop

From the task graph above, you may notice that inc() is only called once. The reason is that a = inc(1) is same in every iterations, so the task scheduler optimizes it, and it is shared to be used multiple times.

Dask-ML: Scalable Machine Learning

What is Dask-ML?

A fantastic library for parallel and distributed machine learning in Python.

Dask-ML aims to enable machine learning on larger datasets and larger problems — scalable machine learning. It is built on top of Dask and can be considered as a parallel implementation of scikit-learn.

Why choose Dask-ML?

It provides scalable machine learning.

You may wonder why we need to use Dask-ML since there is already scikit-learn available. Well, that’s a good catch!

Any machine learning project is likely to suffer from either of the following problems:

  1. Large Datasets: data is much larger than your small computer can handle.
  2. Large Models: data fits well, but training models and doing hyperparameter tuning takes several days, longer than you can wait.

Then, how do you deal with such scaling problems? Well, it depends on which problem you’re facing.

Scaling Pains

a. For in-memory problems: just use scikit-learn or any other machine learning library you like.

Note: scikit-learn is a great library for doing in-memory machine learning on models that are not too big and data can fit in RAM. But it does not scale well.

b. For large models: use distributed scikit-learn, a way of taking your favourite scikit-learn model and using a cluster to do the training instead of a single machine. Check dask_ml.joblib.

Note: scikit-learn already enables parallel computing on a single machine with Joblib. Dask scales this parallelism out to a cluster of machines.

c. For large datasets: use Dask-ML built-in estimators.

Note: Dask-ML reimplements some machine learning algorithms which can be written down as NumPy algorithms, and replaces NumPy arrays with Dask arrays to achieve scalable algorithms. They handle larger-than-memory datasets well. Take a look at linear models and clustering.

Oh, you may wonder how Dask-ML deals with XGBoost and TensorFlow. Well, Dask-ML does not reimplement these, as they already have their own distributed solutions which are working pretty well. If you are interested in details, check Dask-ML + XGBoost and Dask-ML + TensorFlow.

How to use Dask-ML?

Similar to the way of using scikit-learn.

Dask-ML can be installed either with conda or with pip.

Actually, Dask-ML provides a wide range of functionalities, allowing to solve problems where scikit-learn would normally be used. It covers topics from data preprocessing and cross-validation to hyperparameter search and models/estimators.

If you are familiar with scikit-learn, you will feel at home with Dask-ML! Hence, we are not going to guide you step by step here.

The example below shows how to use Dask-ML built-in logistic regression estimator to make predictions. Try with your own dataset and have fun!

And here is a shortlist of common APIs. Explore more on dask_ml.

Dask-Image: Distributed Image Processing

What is Dask-Image?

The only Python-native library for distributed image processing.

Dask-Image is developed with one specific goal in mind: simplifying the workflow for processing large-scale image data. Built on top of Dask, Dask-Image integrates SciPy’s image processing library well together with Dask’s scalable parallel computing capability, and creates an easy-to-use distributed image processing library for everyone. It is truly the first one of its kind.

Why choose Dask-Image?

Images have grown bigger, so use your cluster.

For many scientific-related domains, such as geospatial imaging, astronomy and neuroscience, the size of images is already in the range of terabyte. This situation introduces many new challenges to traditional image processing workflow:

  1. Not fit in-memory: we need to write additional code in order to process the image chunk by chunk.
  2. Not parallel: not everyone knows how to write low-level code to parallel the tasks.
  3. Not inspectable: since chunks are processed in sequence, we can not inspect the intermediate results of the whole image.
  4. Computation overhead: functions like Gaussian filter require the knowledge of neighbour pixels, thus chunks need to overlap with each other.

If you want to maintain an existing workflow without losing any performance, Dask-Image can help you. Thanks to Dask’s distributed computing capability, now researchers can get their results faster without worrying about writing kernel code. Dask-Image provides a familiar, SciPy like API to let you run your image processing pipeline on the cluster. It is also worth noticing that Dask-Image can help with trivial parallel problems, such as batch processing of smaller images.

How to use Dask-Image?

Almost same as SciPy.

Dask-Image can be installed with either conda or pip.

Suppose we want to analyze a large image with Dask-Image. As you would imagine, the first step is loading the image, but can you guess what the name of the function for loading image in Dask-Image is?

It is .imread(), the same name used in many other Python libraries.

You get the feeling that Dask really wants to simplify things, right?

Here we assume the image is split into chunks, which is a common practice to store images that can not fit in-memory. These chunks are stored in a folder called “example-tiles” and have a common prefix of “image.”

Image chunks loaded into Dask-Image

We know Dask-Image works as expected. Now let’s get into the real business.

Most image processing pipelines involve converting the colour space from RGB to grayscale. We will do the same and apply a custom function to these chunks.

Task graph of a trivial parallel problem

Look at the task graph above. We have nine perfectly separated groups, which means colour space conversion is just a trivial parallel problem.

Use the following code to visualize the resulting images.

Image chunks individually converted to grayscale

The results above look perfect.

Let’s try something different. We are going to show you how to merge them back into one image. Since image chucks are just arrays, we can use one of many ways offered by Dask. This time we use .block() function.

We arrange these image chunks according to their spatial location before joining them together, because the functions we want to use next must know how to treat the edges of these chunks.

Now we can try to use the built-in Gaussian filter to smooth the image.

Resulting Image after applying Gaussian filter

Here comes the interesting part. Although we have already combined these chunks into one image, .gaussian_filter() still knows it comes from multiple chunks, and will distribute the computation tasks to workers in a reasonable way. In fact, it is what Dask-Image is designed to do. In situations like this, Dask-Image will try its best to parallel the computation tasks in order to improve performance.

Let’s verify our theory by looking at its task graph again.

Task graph of a non-trivial parallel problem

You might notice nine groups of vertically arranged nodes in the above diagram. We have nine trunks. What a coincidence! Each of them represents the process of one image chunk going through.

Remember the 2D list we passed to .block()? If you look close enough, you can find a bunch of lines connecting vertical groups together. It means .gaussian_filter() needs information from neighbouring pixels to perform the calculation.

Glad we told Dask-Image how to arrange our chunks, right?

Let’s get the image ready for our quantitative analysis by segmenting it with an ad-hoc threshold function. We can give each segment a unique label with .label() function.

Resulting images after threshold and label function

Dask-Image offers many functions under ndmeasure package, which are useful for quantitative analysis.

Let’s try to get the mean and standard deviation for each labelled area using built-in functions.

We have reached the end of our Dask-Image walkthrough.

The final results of our analysis can be stored in a pandas DataFrame.

Final analysis results stored in Pandas DataFrame

Next steps

Founded in 2015, Dask has grown to be one of the best parallel computing frameworks in Python. In the future, we are looking forward to seeing more Python libraries and toolkits supporting Dask.

References

[1] Dask Documentation, https://docs.dask.org/en/latest/index.html

[2] Dask Examples, https://examples.dask.org

[3] Dask-ML, https://ml.dask.org

[4] Dask Image, http://image.dask.org/en/latest/

--

--