A Deep Dive into Dask Dataframes

Yash Sanghvi
Analytics Vidhya
Published in
9 min readAug 23, 2020

Pandas, but for big data

Computing the mean of a dask dataframe’s columns

Ever dealt with dataframes several GBs in size, perhaps exceeding the capacity of the RAM in your local machine? Pandas can’t handle dataframes larger than your RAM. Trust me, I’ve tried it. I clearly remember the aftermath: the background song playing in YouTube interrupted intermittently and then stopped altogether, my machine hung up, and finally, my screen turned black, all in quick succession. I also remember panicking at the thought of having inflicted some permanent damage on my laptop. Much to my relief, a power cycle brought my laptop back to life.

But then, how was I supposed to process this large dataframe? Usher in dask!

In this post, we will, as the title suggests, dive deep into dask dataframes. It should be evident by now that dask performs some magic that pandas can’t. We will essentially look at that magic, the ‘how’ behind the ‘wow’. The table of contents, listed below, will give you a fair idea of what to expect from this post.

  1. What is dask?
  2. Addressing the RAM limitations
  3. Anatomy of the Dask Dataframe
    - Dask’s Task Graph
    - Dask’s Laziness
  4. Partitions and Indexing
  5. Next Steps
  6. References and Useful Resources

What is dask?

In layperson terms, dask is one of the popular gateways to parallel computing in python. So if your machine has 4 cores, it can utilize all 4 of them simultaneously for computation. There are dask equivalents for many popular python libraries like numpy, pandas, scikit-learn, etc. In this post, we are interested in the pandas equivalent: dask dataframes.

Addressing the RAM limitations

Parallel computing is all good. But how does dask address the storage problem? Well, it breaks down the huge dataframe into several smaller pandas dataframes. But the cumulative size of the smaller dataframes is still larger than the RAM. So how does breaking down into smaller dataframes solve the problem? Well, dask stores each of these smaller pandas dataframes on your local storage (HDD/ SSD), and brings in data from the individual dataframes into the RAM as and when required. A typical PC has a local storage capacity of several 100 GBs while the RAM capacity is restricted to just a couple of GBs. Thus, dask allows you to process data much larger than your RAM capacity.

To give an example, say your dataframe contains a billion rows. Now if you want to add two columns to create a third column, pandas would first load that entire dataframe into the RAM and then try to perform the computation. Sadly, depending on the size of your RAM, it may overwhelm your system in the data loading stage itself. Dask will break down the dataframe into, say 100 chunks. It will then bring in 1 chunk into the RAM, perform the computation, and send it back to the disk. It will repeat this with the other 99 chunks. If you have 4 cores in your machine, and your RAM can handle data equal to the size of 4 chunks, all of them will work in parallel and the operation will be completed in 1/4th of the time. The best part: you need not worry about the number of cores involved or the capacity of your RAM. Dask will figure out everything in the background and not give you any burden.

Sounds good? So let’s begin our dive.

Anatomy of the Dask Dataframe

We will be using a CSV file containing more than 74 million location coordinates. The file measures about 2.5 GB in size. Depending on the capacity of the usable RAM in your machine, this may or may not overwhelm your system using pandas. I’ll be using a quad-core machine having 12 GB RAM. So pandas could have comfortably handled this dataframe on my machine. This will be useful when we benchmark pandas and dask in a later post and help us understand when to use dask and when to use pandas.

Let’s read the CSV:

import dask.dataframe as dd
df_dd = dd.read_csv('data/lat_lon.csv')

If you try to visualize the dask dataframe, you will get something like this:

As you can see, unlike pandas, here we just get the structure of the dataframe and not the actual dataframe. If you think about it, it makes sense. Dask has loaded the dataframe in several chunks, which are present on the disk, and not in the RAM. If it has to output the dataframe, it will first need to bring all of them into the RAM, stitch them together and then showcase the final dataframe to you. It won’t do that until you force it to do so using .compute() . More on that later.

The structure itself conveys a lot of information. For instance, it shows that the dataframe has been split into 41 chunks. It shows the columns in the dataframe and their data types. So far, so good. But hang on, what is the meaning of the last line, containing dask name and the number of tasks? What are these tasks? What is the significance of the name?

Dask’s Task Graph

We can get a lot of these answers by invoking the .visualize() method. Let’s try it out. Please note that you will need the Graphviz library for this method to work.

df_dd.visualize()

Not quite legible, right? Let’s reduce the number of partitions and try. To reduce the number of partitions, we will read the CSV again, this time specifying a block size of 600MB.

df_dd = dd.read_csv('data/lat_lon.csv',blocksize='600MB')

Now, we can see that the dataframe has been broken down into 5 chunks.

The df_dd.visualize() command’s output will now be more legible.

The above image represents a task graph in dask. Task graphs are dask’s way of representing parallel computations. The circles represent the tasks or functions and the squares represent the outputs/ results. As you can see, the process of creating the dask dataframe requires a total of 15 tasks, 3 tasks per partition. In general, the number of dask tasks will be a multiple of the number of partitions, unless we perform an aggregate computation, like max(). In the first step, it will read a block of 600 MB (it can be ≤ 600 MB for the last partition). It will then convert this block of bytes to a pandas dataframe in the next step. In the final step, it makes this dataframe ready for parallel computation using from_delayed.

Dask’s Laziness

The delayed function lies at the core of parallelization in dask. As the name suggests, it delays the output of a computation, and instead, returns a lazy dask graph. You will see the word lazy repeat again and again in the official dask documentation. Why lazy? As per the official dask documentation, the dask graphs are lazy because:

Rather than compute their results immediately, they record what we want to compute as a task into a graph that we’ll run later on parallel hardware.

Once dask has the entire task graph in front of it, it is much efficient to parallelize the computation.

Dask’s laziness will become more clear with the following example. Let us visualize the output of calling .max() on the dask dataframe.

df_dd.max().visualize()

As you can see, instead of computing the max value of each column, calling .max() just appended tasks to the original task graph. It added two steps. One is the computation of the max for each partition, and the other is the computation of the aggregate max.

The lazy dask graph just mimics a lazy engineer. When given a problem, a competent but lazy engineer will just create the solution graph in his/her mind and let it stay there. Only when forced to provide the results (say by a deadline) will the engineer perform the computations. To get the results from the dask graph, we have to force it to perform the computations. We do that by calling the .compute() method.

df_dd.max().compute()
>> serial_no 7.470911e+07
latitude 1.657000e+02
longitude 1.631770e+02
dtype: float64

The CSV used has dummy data. So don’t worry about latitude being greater than 90 degrees.

Before we go ahead, I’d just like to show you what would have happened had you chosen to invoke the repartition command instead of reading the CSV again with a specified block size.

If we go back to the old dask dataframe with 41 partitions, and apply the repartition command and then run .visualize(), the output will be something like this:

df_dd = dd.read_csv('data/lat_lon.csv')
df_dd = df_dd.repartition(npartitions=5)
df_dd.visualize()

We have essentially increased the number of tasks. Repartitioning should be used only when you have reduced the original dataframe to a fraction of its size and no longer need a lot of partitions. To reduce the number of partitions right at the beginning, it is best to mention the blocksize while reading data.

Partitions and Indexing

Let’s now look at the partitions of the dask dataframe. We can access the individual partitions with the .get_partition() method. Let’s call the first partition.

df_0 = df_dd.get_partition(0)

Let us look at the index range of this partition

df_0.index.min().compute()
>> 0
df_0.index.max().compute()
>> 17707567

Now, the rather surprising thing is that you will get approximately the same answer if you run these computations on the next partition.

df_1 = df_dd.get_partition(1)
df_1.index.min().compute()
>> 0
df_1.index.max().compute()
>> 17405554

What this means is that there is no common index spanning across the partitions. Each partition has its separate index. This can also be confirmed by accessing the dataframe divisions. The divisions contain the min value of each partition’s index and the max value of the last partition’s index.

df_dd.divisions
(None, None, None, None, None, None)

Here, since the partitions are independent and each has its separate index, dask doesn’t find a common index according to which the data is divided.

For a dataframe with no well-defined divisions, an operation like df_dd.loc[5:500] will make no sense. Each partition has indices from 5 to 500. If we have no column in the dataframe which can be used as an index, we simply can’t use index-based .loc.

Luckily, we have a serial number column which we can use as the index. We will have to inform the dask dataframe to use it as the index.

df_dd = df_dd.set_index('serial_no')

Please note that this is a costly operation, and shouldn’t be done again and again. It involves a lot of communication, shuffling, and exchange of data between the partitions. However, a one-time effort is worth it, if you need to perform frequent index-based slicing. After setting the index, if we check the divisions again, we will get the following output:

df_dd.divisions
>> (0, 17707568, 35113123, 52512497, 69929691, 74709113)

Note that serial number will no longer be a separate column.

df_dd.columns
>> Index(['latitude', 'longitude'], dtype='object')

Now, thanks to the indexing, each partition will have non-overlapping min and max indices.

df_1 = df_dd.get_partition(1)
df_1.index.min().compute()
>> 17707568
df_1.index.max().compute()
>> 35113122

Setting the index has some significant advantages if you need to slice by index. On the original dataframe, df_dd.loc[5:500].compute() takes about 42 seconds, and of course, the result is far from accurate, and the length of the result is n_partitions times the expected length. On the indexed dataframe, the same command takes just 4 seconds and returns the correct result.

Next Steps:

In this post, a pretty detailed examination of the dask dataframe was done. Now that you know how dask is different from pandas, you can move on to the next post, where we look at the parallelization of tasks using dask. While dask takes care of most of the parallelization itself, there are a few things you should know to fully take advantage of dask’s parallelization. See you in the next post (How to efficiently parallelize dask dataframe computations on a single machine), where we will discuss why calling .compute() may not be the most efficient way to calculate results.

References and Useful Resources:

Dask is very well documented. You can have a look at the following resources to dive deeper into dask:

  1. Dask Documentation: https://docs.dask.org/en/latest/
  2. Dask Tutorials: https://tutorial.dask.org/
  3. Dask Tutorial Notebooks: https://github.com/dask/dask-tutorial
  4. Dask Examples: https://examples.dask.org/

If you’re looking for a quick way to run Dask for free on GPU clusters, you can try Saturn Cloud, a free data science and machine learning platform for data scientists.

--

--