How to efficiently parallelize Dask Dataframe computation on a Single Machine

Yash Sanghvi
Analytics Vidhya
Published in
8 min readAug 23, 2020

Threads, Processes, Cluster — What to choose when

In the previous story, we had a deep dive into the dask dataframes. We saw how they are lazy and will perform computations only when forced to, using the .compute() method. Otherwise, they just generate the task graph of the computation to be performed. However, does .compute() guarantee that the task is parallelized using the most efficient way? Not always. In this post, we will have a look at the different schedulers that can be employed to parallelize code execution and determine which scheduler to use when.

Table of Contents:

  1. Machine Specifications
  2. Types of Schedulers
    - Processes
    - Threads
    - Dask Distributed (Single Machine)
  3. The Spoilsport — Python’s Global Interpreter Lock
  4. Example — Delays and Computations
  5. Visualizing the difference between threads and processes using Task Manager
  6. The Bottom Line
  7. References

Machine Specifications:

The machine specifications are as follows:

  • RAM: 12 GB
  • Number of Cores: 4
  • Processor: Intel Core i7 8565 U
  • Processor speed: 1.8 GHz (up to 4.6 GHz)

Each core has two threads, so a maximum of 8 threads are possible.

Types of Schedulers:

The official dask documentation is an excellent resource for this section. Specifically for a single machine, you have the following options available:

Processes:

Processes are independent programs. Processes, in fact, further parallelize their operations using threads. They are more resource-hungry and their creation and termination take more time than threads. They don’t exchange data, memory, or resources. As per the dask documentation, when parallelizing tasks using processes,

Every task and all of its dependencies are shipped to a local process, executed, and then their result is shipped back to the main process.

Evidently, the processes have higher task overheads than threads. One important thing to note is that a process is isolated from other processes. It has its own memory space and the OS makes sure that it remains isolated.

Threads:

Threads are what allow a process to execute its tasks parallelly. They are essentially segments of a process. They are less resource-hungry and can be created and terminated quickly. They can share data, memory, and resources. They have very little task overheads (~50us). This means that if a task takes x seconds to execute, running it on a thread will take x seconds + 50us.

Dask Distributed (Single Machine):

You can consider this as a clone of dask distributed running on a cluster of machines. It gives you higher flexibility. You can set the number of workers (processes), define the number of threads per process, and so on. It also provides you with a diagnostic dashboard to view the progress of the computation. Also, it provides an asynchronous API, Futures. This is helpful because you can then perform the computation in the background in a non-blocking mode. You can submit a computation to the cluster and it will return a Future. The cluster will then perform the computation in the background and you can carry on with your other computations. Whenever required, you can query the results of the cluster computation using the .result() method on the Future. If the result is ready, it will be returned. To know more about how to create a cluster on your local machine, you can go through this short tutorial.

So while dask distributed does provide some useful features, at the core of the computations lie processes and threads. Thus, how fast your computation will run will depend on how it is organized into processes and threads. So let’s focus on these two.

Reading the above descriptions would have convinced you that threads are superior compared to processes. They have lower overheads, consume fewer resources, and can even share data. Then why are we even considering processes? This is because of the limitations imposed by Python’s Global Interpreter Lock (GIL).

The Spoilsport — Python’s Global Interpreter Lock:

In layman terms, python’s GIL ensures that only one python instruction is carried out across all threads of a process at a time. So if a process has 5 threads which want to, say find the lengths of different strings. At any given time, only one thread will be able to perform that operation while the others will wait for this thread to finish. So essentially, that kills parallelism using threads if you are performing a python task. If you want to learn more about GIL in python, especially why it exists in the first place, you can refer to the several results that show up with a quick Google search. A detailed study of python’s GIL is beyond the scope of this post.

Dask’s documentation states that we should use threads to parallelize operation only when our tasks are dominated by non-Python code. However, if you just call .compute() on a dask dataframe, it will by default use threads to parallelize the execution. To use processes, you need to specify the scheduler as an argument, like .compute(scheduler=’processes’).

Let us look at an example to understand the difference between threads and processes in more detail.

Example — Delays and Computations:

We will use a short version of the dataframe used in the previous post. We have taken the first 50,000 rows of that dataframe and stored them in a new CSV called ‘lat_lon_short.csv’. Let us load that CSV into a dask dataframe, set the index, and partition it.

dfdask = dd.read_csv('data/lat_lon_short.csv')
dfdask = dfdask.set_index('serial_no',sorted=True)
dfdask = dfdask.repartition(npartitions=8)

Now we will perform two rather weird operations on this dataframe. Weird because you won’t generally perform such operations on a real dataframe. However, these operations help illustrate the point.

Operation 1:

dfdask['test1'] = dfdask.apply(lambda row: str(row.latitude)+'-'+str(test_fun()),axis=1, meta=('str'))

Here we are applying a lambda function on the dataframe, which converts the latitude into a string and appends the output of the test_fun function to that string, and stores the result in a new column. The test_fun is defined as follows:

def test_fun():
time.sleep(0.01)
return 5

So the test_fun returns a value of 5 after a delay of 10 ms.

Operation 2:

dfdask['test2'] = dfdask.apply(lambda row: len(row.test1),axis=1, meta=('int'))

This operation just outputs the length of the string created in the previous operation, and stores it in a new column.

Now let us look at the results:

Results:

Time taken (in s) for .compute() when only operation 1 is performed:

Time taken (in s) for .compute() when both operations are performed:

Inferences:

There is a lot to unpack here. Let’s go over the inferences one by one.

  • The time with processes was lower when performing both operations, but it was higher when performing just operation 1.

This observation is the most important to understand the difference between processes and threads. You need to understand that a delay introduced by time.sleep() is rather a lack of instruction. The thread remains idle for the duration of that delay. Therefore, other threads can execute in that period. Therefore, with operation 1 alone, threads can operate in parallel, just like the processes. Hence, the only differentiating factor is the overhead, which is higher for processes than for threads. Therefore, with only operation 1, the threads are faster than the processes.

Operation 2, however, involves python instructions. Therefore, python’s GIL swings into action and ensures that only one thread executes the instruction at a time. Therefore, threads perform operation 2 serially. On the other hand, processes are independent, and therefore, they perform operation 2 parallelly and return the results much faster with operation 2. Thus, when both the operations are performed, the time saved by processes during operation 2 compensates for the extra overhead. Therefore, the result is that processes take lesser time than threads when performing both the operations.

  • The time, as expected, did not change on increasing the number of partitions beyond 8.

As explained in the Machine Specifications section, the machine has 4 cores and therefore a maximum of 8 threads/ processes can be run in parallel.

  • The time difference between threads and processes is nearly constant (3–4 seconds) when only operation 1 is performed

Once again, since the only difference between threads and processes is going to be because of overheads when performing operation 1, it is independent of the number of partitions. When there is 1 partition, only one process is created. When there are 8 partitions, 8 processes are created. But the overhead is equal to the overhead of 1 partition. This is because all 8 are running in parallel. When there are more than 8 partitions, again, only 8 processes are created because of hardware limitations. Each process handles multiple partitions. That’s what the time difference suggests.

Visualizing the difference between threads and processes using Task Manager

If you are on a Windows machine, the Task Manager also illustrates the difference between threads and processes.

When you are running threads, the task manager will look something like this:

Task Manager when running threads

When running processes, the task manager will look something like this:

Task manager when running processes

As you can see, only a single process exists when running threads, whereas 8 processes are created when running processes.

The Bottom Line:

The above example would have made the difference between threads and processes clear. Whenever performing python computations, it is best to use processes. If your computation is dominated by non-python instructions, you can use threads for lower overheads. Please note that if your computation is going to be dominated by numeric code using NumPy or Pandas, you may be better off using threads. See Numpy “releases” GIL. What does that mean?

You can use the dask.distributed client for sophisticated features like a monitoring dashboard and asynchronous futures. But I’ve found that to be slightly slower than processes when running python instructions.

References:

  1. Scheduling: https://docs.dask.org/en/latest/scheduling.html
  2. Dask.distributed on a single machine: https://docs.dask.org/en/latest/setup/single-distributed.html

--

--