Up the “Data Processing” Ante with Modin Pandas-Dask and Ray 📈

Anchal Jain
SFU Professional Computer Science
10 min readFeb 4, 2020

The Team: Abhishek Sundar Raman, Amogh Kallihal, Anchal Jain, Gayatri Ganapathy

This blog is written and maintained by students in the Professional Master’s Program in the School of Computing Science at Simon Fraser University as part of their course credit. To learn more about this unique program, please visit here.

A s Wikipedia puts it, the term “panel data” is an econometrics term for data sets that include observations over multiple periods for the same individuals. Yes, you guessed it right, we are talking about the every data science professionals’ essential arsenal for attacking the data. Pandas is an open-source BSD licensed library which provides the data structures for performing data analysis on the structured data and semi-structured data. Pandas library is integrated with Python and works seamlessly on it.

Now you must be wondering when things are working smoothly what is the need to up the ante? Yes, your doubt is valid, But let’s see why we need to up the ante. Pandas are very efficient when we work with small datasets of the size 1 MB to probably even 1 GB. As we scale the amount of the data that we need to process, it starts to become a little shaky. The analogy is that if you want to travel around the cities do so with your “Mazda” but then if you have plans to drive on a racetrack think of a “Ferrari”. Now your question might be why can’t “Mazda” be used for the race. Most certainly you can, but “Ferrari” performs better given the conditions.

With the exact intent in mind, this blog post is written on how we can make use of the latest state of the art data science technologies such as Modin, Ray, and Dask to improve your “data processing” capabilities.

Why Modin?

Now, can you visualize your “Mazda” car fitted with Nitrogen Gas boosters speeding past on the racetrack? Similarly, by changing just one line of code in your script, Pandas, with the help of Modin, can scale from running on 1 GB to around 100’s of GBs of data using the same number of cores in your machine.

Figure 1. Usage of cores by Pandas and Modin

Modin is an open-source library and available on GitHub. It is an intelligent distributed multiprocess DataFrame library that can optimally utilize all the cores in the machine. Let’s say you have two cores in your machine. Pandas DataFrame operation uses only one core at a time whereas Modin will distribute the work among all the available cores to complete the work. Thus, efficiently utilizing the available cores for all the scheduling computation and operations.

Figure 2. Modin Architecture

As per the official documentation, “Modin is logically separated into different layers that represent the hierarchy of a typical Database Management System. Abstracting out each component allows us to individually optimize and swap out components without affecting the rest of the system.” This means Modin can seamlessly and independently integrate with any scheduler and compute the engine of your choice without changing other components in the stack.

You don’t have to worry about how many cores or cluster your system has or how to distribute the data. In fact, you can continue using your previous Pandas notebooks while experiencing a considerable speedup from Pandas on Ray/Dask, even on a single machine.

Here is how you can install Modin from PyPI:

pip install modin

Now you must be wondering what is that single line that must be changed to have better performance, it is as follows:

import modin.pandas as pd

The above instruction would inform your Python interpreter to use Modin libraries and distribute the computations across all the cores in your system.

Team of Modin

Modin is like a manager who has two project leads that he works with to complete the projects/tasks at hand. He works with his two efficient teams led by Dask and Ray. Dask and Ray both are high-performance parallel distributive frameworks that help Modin to perform faster computations in a distributed fashion.

Let’s see how to initialize them first.

pip install modin[ray]      # Install Modin dependencies and Ray
pip install modin[dask] # Install Modin dependencies and Dask
pip install modin[all] # Install both of them

*** Note: Ray is not supported on Windows Subsystem.

Let’s understand how each of these teams works with an example.

Team Dask

Suppose one project requires heavy computations related to the analysis of large datasets collected from varied sources and the deadline is approaching. In this scenario, Dask is chosen. Dask is a perfect team leader. He breaks down the large tasks and distributes them efficiently among his hardware team members in a distributed fashion. This means that Dask can routinely run on thousand node clusters to process hundreds of terabytes of data efficiently. He ensures that none of his team members feels overwhelmed by the load. Dask avoids usage of excess memory by ensuring the computations are performed with a low memory footprint. Only the data necessary for the computation is copied from the disk and it is quickly cleaned up once these computations are done. Thus, Dask can process a dataset of size greater than 100 GB even on relatively low-powered machines.

Dask understands that his team members are bound to break down at some point in time and therefore has backup plans to handle such failures. The built-in resilience of the Dask engine to handle failures of the worker nodes gracefully makes it a scalable and elastic library capable of handling terabytes of data for computations.

Dask is also very particular about the security of his data; therefore he works in secure environments by enforcing encryption and authentication using TLS/SSL certificates.

Dask also provides his team members with an interface to process complex computations through the exposure of the low-level API’s of the task scheduler, thus enabling it to execute advanced computations.

Team Ray

Now suppose there is another project that involves machine learning activities like data processing, training the model, hyper-parameter tuning, etc. Here, comes our ML and AI expert Ray.

Image credits: https://media.giphy.com/media/SL0ctZ9qJKJLa/giphy.gif

Ray is a domain expert in Artificial Intelligence and can perform as a parallel and distributed execution framework with Python and Java APIs. He efficiently uses the computing environment for most of the AI-based applications that require distributed training, distributed reinforcement learning, hyper-parameter tuning and data processing.

All the requirements of Ray’s team are stored in a common object store known as the Plasma. Even if all his team members (nodes) are not in the same location, he ensures smooth execution of applications through communication, data transfer among his team members and resistance to failure. These stored objects are immutable and are accessible across multiple processes and nodes. They are serialized, so they can be easily saved to persistent storage or streamed across a communication link. Overall, Ray is capable of handling caveats in distributed AI and machine learning.

Code Time!

Let’s get our hands dirty and try to understand how Modin with the underlying scheduler engine as Dask and Ray are affecting the Pandas operations.

All the code snippets that are shown below are executed on an Apple MAC-book Pro with 16 GB RAM having a 6 cores Intel® 9th Generation processor.

For the demonstration purpose, we have used kiva_loans.csv file of 170 MB in size from Kaggle.

Commonly Used Pandas Operations

read_csv ():

Pandas

import pandas as pandas_pd
%time pandas_df = pandas_pd.read_csv("kiva_loans.csv")
....................................................................CPU times: user 2.91 s, sys: 226 ms, total: 3.14 s
Wall time: 3.09 s

Modin with Dask

import os
os.environ["MODIN_ENGINE"] = "dask" # Modin will use Dask
from distributed import Client
client = Client(memory_limit='8GB')
import modin.pandas as dask_pd
%time mdask_df = dask_pd.read_csv("kiva_loans.csv")
--------------------------------------------------------------CPU times: user 604 ms, sys: 288 ms, total: 892 ms
Wall time: 1.74 s

The environment variable MODIN_ENGINE is set here to instruct Modin to use Dask as a scheduler engine. The client memory limit value is set as 8 GB explicitly since we observed that default memory would result in an “out of memory” problem in some of the cases.

Modin with Ray

import os
os.environ["MODIN_ENGINE"] = "ray" # Modin will use Ray
import ray
ray.init(memory=5242880000)
import modin.pandas as ray_pd
%time mray_df = ray_pd.read_csv("kiva_loans.csv")
...................................................................CPU times: user 762 ms, sys: 473 ms, total: 1.24 s
Wall time: 2.42 s

The environment variable MODIN_ENGINE is set here to instruct Modin to use Ray as a scheduler engine. The client memory limit value is set as 5GB to inform the Ray worker nodes that this is the amount of memory available for the scheduling operations similar to the way it handles the CPU and GPU resources.

*** Note: The above operations are executed on a standalone setup and these numbers can vary based on the hardware resources that are available to the Modin.

append()

%time df1 = pandas_df.append(pandas_df)
%time df2 = mdask_df.append(mdask_df)
%time df3 = mray_df.append(mray_df)
..................................................................
CPU times: user 1.77 s, sys: 3.34 s, total: 5.12 s
Wall time: 5.08 s
CPU times: user 589 ms, sys: 448 ms, total: 1.04 s
Wall time: 1.09 s
CPU times: user 6.66 ms, sys: 15.1 ms, total: 21.7 ms
Wall time: 26.6 ms

concat ()

%time df1 = pandas_pd.concat([pandas_df for _ in range(5)])
%time df2 = dask_pd.concat([mdask_df for _ in range(5)])
%time df3 = ray_pd.concat([mray_df for _ in range(5)])
....................................................................CPU times: user 1.58 s, sys: 269 ms, total: 1.85 s
Wall time: 1.84 s
CPU times: user 229 ms, sys: 162 ms, total: 390 ms
Wall time: 375 ms
CPU times: user 10.1 ms, sys: 24.6 ms, total: 34.7 ms
Wall time: 51.2 ms

In the above code, we concatenated the data frame with itself around 5 times and checked the time taken for the operation.

isnull()

%time pandas_pd.isnull(pandas_df["use"])
%time dask_pd.isnull(mdask_df["use"])
%time ray_pd.isnull(mray_df["use"])
....................................................................CPU times: user 41.5 ms, sys: 75.9 ms, total: 117 ms
Wall time: 115 ms
CPU times: user 8.74 ms, sys: 554 µs, total: 9.3 ms
Wall time: 9.13 ms
CPU times: user 34.3 ms, sys: 5.59 ms, total: 39.9 ms
Wall time: 39 ms

fillna()

%time pandas_df.fillna(value=0)
%time mdask_df.fillna(value=0)
%time mray_df.fillna(value=0)
....................................................................CPU times: user 924 ms, sys: 328 ms, total: 1.25 s
Wall time: 1.37 s
CPU times: user 2.91 ms, sys: 488 µs, total: 3.39 ms
Wall time: 3.03 ms
CPU times: user 18.1 ms, sys: 4.27 ms, total: 22.4 ms
Wall time: 20.8 ms
Figure 3. Percentage-wise time execution with Pandas, Modin-Dask, Modin-Ray

Diagnostics

The first step in making the computations run quickly is to understand the costs involved in it. On a single machine, we have many diagnostic tools like CProfile module, VMProf or snakeviz to understand the costs involved with our code. But in a distributed environment, we have new costs like data transfer, serialization, task scheduling, overhead and a lot more that we are not accustomed to tracking.

The distributed scheduler has several diagnostic web pages showing dozens of recorded metrics like CPU, memory, network, disk usage, history of previous tasks, allocation of tasks to workers, worker memory pressure, work-stealing, open file handle limits, etc.

Dask Diagnostic

Figure 4. Real-time visualization of tasks on Dask timeline

Fortunately, the Dask Schedulers come with diagnostics to understand the performance of these computations. If Bokeh is installed, then the dashboard will start up automatically when the scheduler is created/initialized. The diagnostics for Dask on a local machine is started with the following command.

from dask.distributed import Client
client = Client() # start distributed scheduler

Many problems can be diagnosed by inspecting the page usually served at the following location http://localhost:8787/status or served elsewhere if the 8787 port is already taken by some other process. This information will be displayed if we are using the Jupyter Notebook or can be queried using the following command.

client.scheduler_info()['services']

Figure 4 shows an interactive dashboard with live information about the tasks run time, communication between the worker nodes, statistical profiling, load balancing, memory use, and much more for the user to understand the computations on the Dask Engine.

Ray Diagnostic

Figure 5. Real-time visualization of tasks on Ray timeline

To visualize the tasks of Ray, dump the timeline as a JSON file by running the following command.

ray.timeline(filename="/data/timeline.json")

Now open chrome://tracing in the Chrome web browser and load the timeline.json file to view the timeline of the Ray tasks, similar to figure 5 shown above. We can see the tasks performed by Ray on the right side of the web browser. When an item is selected on the timeline or multiple items are selected, detailed information is presented at the bottom showing its start, end time and the number of occurrences of the event.

Conclusion

Modin library is relatively new and is still in the development phase. While we are reading this blog, some of the APIs in Modin Pandas are still being developed and therefore you might observe that Modin Pandas might default to Pandas implementation for those APIs.

We observed that Pandas operations, when executed using Modin with Dask or Ray as the scheduler engines, improved the execution time significantly. The tasks performed executed at a faster pace through the distribution of work among all the available resources.

The goal of this blog post is to provide you with a small peek through the world of Modin Pandas along with the evolving technologies of Dask and Ray. We hope this blog post encourages you to explore more into this rapidly changing ecosystem of Data Science Technologies.

References

  1. https://rise.cs.berkeley.edu/blog/pandas-on-ray/
  2. https://ray.readthedocs.io/en/latest/
  3. https://docs.dask.org/
  4. https://github.com/modin-project/modin
  5. https://modin.readthedocs.io/en/latest/architecture.html
  6. https://docs.bokeh.org/en/latest/docs/installation.html
  7. https://en.wikipedia.org/wiki/Pandas_(software)
  8. https://www.kdnuggets.com/2019/11/speed-up-pandas-4x.html

--

--