Quick and Dirty DataFrame Processing With CPU & GPU Clusters In The Cloud

Gus Cavanaugh
15 min readOct 29, 2021

--

This post provides a quick example of reading and processing a DataFrame from single machines to clusters on both CPUs and GPUs. We’ll scale up only when it makes sense and we won’t break the bank.

Introduction

By now, everyone has heard how GPUs can make ML stuff like training and inferencing crazy fast. And thanks to packages like PyTorch, you can toggle between the CPU and GPU with minimal code changes. But maybe you’re like me and you don’t spend much (or any) time with sexy ML tasks like training and scoring. Instead, you work on mundane tasks like tabular data in DataFrames. Can GPUs help us? And is the juice worth the squeeze?

More specifically, I’d like to know:

  • Can GPUs make my DataFrame operations faster without having to learn a a new API?
  • Can I set this up without having to be Linux/CLI ninja?
  • Can I do this in the Cloud for a buck or two so that I can show my boss a working example without going broke in the process?

Thanks to Rapids and the Cloud, the answer to all three is yes.

  • Rapids lets Python users comfortable with packages like pandas write similar code that executes on the GPU. Fast without rewriting.
  • The cloud providers rent us machines with both GPUs and the pesky CUDA driver installed so we don’t have to bother. They can be expensive if we leave the running (so we will be clever about shutting them off)

Bottom line: If you can click and use conda, this post will show you how to create individual and clusters of GPU machines to process your DataFrames. I’ll toss in some comparisons to CPU machines and clusters so you can see the performance boost.

Specifically, we cover:

  1. How to create a development environment with a GPU (on the cloud)
  2. How to install Rapids in that environment
  3. Compare GPU to CPU processing for a given DataFrame operation
  4. Scale our analysis to both CPU and GPU clusters

Two important notes:

  1. If you know Pandas, you can use cuDF to process your DataFrames on a GPU. This can be much faster. You should do this if you care about speed.
  2. You will run into memory bottlenecks with GPUs on individual machines. We’ll use Dask to scale both CPU (pandas) and GPU (rapids cuDF), but Dask is especially useful for our GPU clusters so that we can get around those memory bottlenecks and let our GPUs really sing.

Prerequisites

  1. Cloud Account
  • Rapids provides quick-start documentation with a variety of options for AWS, Azure, and GCP.
  • I recommend a GPU client using the Cloud managed notebooks services. This means AWS Sagemaker, GCP Vertex AI, or Azure Machine Learning. I’ve found these to be the fastest and most cost effective approaches.
  • While GPUs can be expensive, we’ll run this example for $1.50 or so by having a modest client machine (GCP Vertex AI instance shown is $0.38/hr) and then launching our GPU clusters from that machine. This way we only ever have multiple GPU machines running when we are running our actual code (and not when we are installing dependencies, sipping coffee, or scrolling Twitter).
  • I show setup examples for just AWS Sagemaker and GCP Vertex AI. I’m still working with Azure support to get quota approval for a modest sized T4 or similar instance. I’ll update this post when I have the requisite quota in Azure ML.

2. Coiled Account (Optional but HIGHLY recommended)

  • Coiled provides on-demand Dask clusters with both CPU and GPU machines. As a Coiled sales guy, I am heavily biased. But you can create a free account and see for yourself.
  • Sign up for free with Github, Google, or your email address
  • Message me (gus@coiled.io) to get access to GPUs and additional free credits for reading this post. Free!
  • Or: follow the Rapids documentation for AWS, Azure, or GCP on deploying Dask clusters. You can also try other Dask commercial vendors like Saturn Cloud

3. Budget

  • AWS, GCP, Azure, and Coiled all offer free tiers so there should be no out of pocket cost to you
  • The example below accrue ≤ $1.50 (split between free tiers of your Cloud provider and Coiled). Again, if you’re new to AWS, Azure, or GCP — they all have generous free tiers. And as I’m a Coiled sales guy, I can guarantee you free credits on Coiled.

GPU Setup In AWS

Within AWS, I recommend AWS Sagemaker to create our client machine. This gives us a local GPU we can use to test out Rapids. We’ll scale out only if it makes sense.

  • Create an AWS Sagemaker Notebook
  • Under Notebook instance type, select ml.g4dn.xlarge. This is modest GPU machine that will keep your costs down (~$0.75 USD — 75 cents per hour)
  • Under Notebook instance name, choose/write a name for your instance
  • You’re all set — click Create notebook instance

GPU Setup In GCP

Within GCP, I recommend Vertex AI to create our local GPU client. I can’t keep up with GCP name changes, but at $0.38/hour for our T4 (roughly half the cost of the similar Sagemaker instance) I won’t complain too loudly.

  • Create a Vertex AI Notebook
  • Select Notebook Type Rapids 0.18 [EXPERIMENTAL] (note that we will install a more recent version of Rapids in the next section — this just ensures that CUDA is correctly configured)
  • This instance type (4 vCPUs, 15 GB RAM) costs $0.38 USD/hour
  • Name your notebook instance
  • At the bottom, check Install NVIDIA GPU driver automatically for me
  • Click CREATE

GPU Setup In Azure

// Please, Azure! Give me a T4 in Azure Machine Learning. I will pay you! Or even just sufficient quota to launch a NCasT4v3 VM. I’ll install everything else. Please!

(I’ll update this section when my Azure quotas are adjusted)

Setup Local Dependencies

No matter which cloud we select, we have a Jupyter environment with a GPU and our CUDA driver installed. The Rapids quick-start docs will take us home.

Within your JupyterLab environment, open a terminal session. I recommend using the Rapids quick start to get all of our dependencies installed. See screenshot below

  1. At the time of this writing, we’ll run
conda create -n rapids-21.10 -c rapidsai -c nvidia -c conda-forge     rapids=21.10 python=3.8 cudatoolkit=11.2 -y

This will take ~5–10 minutes to install. Once completed, activate the environment.

conda activate rapids-21.10

Sagemaker users should use source instead of conda due to the default version of conda that ships with Sagemaker. This may change in the future.

source activate rapids-21.10

2. Optional but recommended: Install the local Coiled client. Coiled will provide Dask clusters from Jupyter for our later examples.

Use pip to install coiled:

pip install coiled --upgrade

Login to Coiled using the CLI (email me: gus@coiled.io for free GPU credits)

coiled login --token <your-token-from-coiled-web-ui> --account <your-account-name>

DataFrame On The GPU

We’ll run our first example on the Jupyter client (no external Dask cluster yet). Open a notebook and get to work!

We’ll use cudf. Below we read in the December 2019 NYC Taxi dataset.

import cudf
taxi = cudf.read_csv(
"https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-12.csv",
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
)

Let’s run a groupby on the dataset:

taxi.groupby("passenger_count").tip_amount.mean()

Using %%time, we can see that it runs pretty fast:

CPU times: user 24 ms, sys: 8.06 ms, total: 32.1 ms
Wall time: 30.7 ms
passenger_count
2.0 2.317078
0.0 2.147058
1.0 2.258916
4.0 2.066968
9.0 8.946250
7.0 5.934889
8.0 12.180000
5.0 2.282449
3.0 2.215962
6.0 2.277444
Name: tip_amount, dtype: float64

Time: ~30 MS

Hold on!

This dataset has ~7M rows. 7M is more than we’d probably work with in Excel, but it is not a particularly large dataset. Pandas should handle this just fine — do we even need a GPU for this dataset? Let’s find out.

DataFrame On The CPU

(Open a new notebook or just create a new cell in your existing notebook)

We’ll use standard pandas syntax to read in the same CSV file

pd_df = pd.read_csv(
"https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-12.csv",
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
)

Now let’s run the groupby and time the result

%%time 
pd_df.groupby("passenger_count").tip_amount.mean()

Result:

CPU times: user 121 ms, sys: 0 ns, total: 121 ms
Wall time: 120 ms
passenger_count
0.0 2.147058
1.0 2.258916
2.0 2.317078
3.0 2.215962
4.0 2.066968
5.0 2.282449
6.0 2.277444
7.0 5.934889
8.0 12.180000
9.0 8.946250
Name: tip_amount, dtype: float64

CPU Time: ~120 MS vs GPU Time: ~30 MS

So the GPU is ~4X faster but it’s not noticeable because both are really fast. So who cares? Given this data size and computation, it doesn’t make sense to spend the money on the GPU for this simple groupby.

We need bigger data to make the GPU difference meaningful

DataFrame On The GPU — More Data

Instead of just December, let’s read in the entire year of data. This is ~84M rows.

This code is ugly but it should get the job done. Below I loop through each CSV file in S3 and concatenate them into one big dataframe

lst_dfs = []
file_path="https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-"
for i in range(1,13):
if i < 10:
new_file_path = file_path + str(0) + str(i) + ".csv"
else:
new_file_path = file_path + str(i) + ".csv"
df = cudf.read_csv(
new_file_path,
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],)
lst_dfs.append(df)
new_df = cudf.concat(lst_dfs)

Uh-oh! When I run this I get a Memory Error. This is not uncommon. GPU memory tends to lag behind CPU memory. And in our case, this is especially problematic as we are using a modest client instance to keep our costs down.

MemoryError: std::bad_alloc: CUDA error at: /home/ec2-user/anaconda3/envs/rapids-21.10/include/rmm/mr/device/cuda_memory_resource.hpp:70: cudaErrorMemoryAllocation out of memory

Let’s re-run that same code with pandas instead of cudf. I see a memory error with pandas as well, so in this case the dataset overwhelms both GPU and CPU memory.

MemoryError: Unable to allocate 684. MiB for an array with shape (13, 6896317) and data type float64

This is frustrating. With small data that fits in memory, the GPU isn’t appreciably faster than pandas. With larger data, we hit a memory bottleneck that prevents us from taking advantage of the faster GPU cores. What do we do?

Enter Dask. Dask is an open source python package that make parallelizing python easy. It does this by scaling python across multiple cores on an individual machine or across multiple machines in a cluster. By using Dask, we can get around the memory bottleneck on our local client. Even better, Dask has a DataFrame API that works with both CPUs and GPUs so we don’t have to learn a new API.

Use Dask Locally (No GPU)

Dask works great on both individual machines and on clusters. Let’s start with our local instance to see if Dask helps.

Create local Dask cluster following the quick start. This is a local cluster, meaning Dask is just running on our client machine (our Jupyter instance). In later examples we’ll create a distributed cluster using lots of machines

from dask.distributed import Client
client = Client()

By default, our local Dask cluster only uses our CPU cores (not GPUs).

Now we run the groupby using very similar code with Dask DataFrame.

%%timeimport dask.dataframe as dddf = dd.read_csv(
"s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
dtype={
"payment_type": "UInt8",
"VendorID": "UInt8",
"passenger_count": "UInt8",
"RatecodeID": "UInt8",
},
storage_options={"anon": True},
blocksize="16 MiB",
).persist()
df.groupby("passenger_count").tip_amount.mean().compute()

Get comfortable — this may a few minutes (~11 min). But hey! No memory bottleneck. Below is my wall time

Wall time: 11min 22s

Time: ~ 11 Minutes. Pretty slow, but at least it finished. Let’s see if using Dask with our GPU speeds things up (hint: it will!)

Use Dask Locally (GPU)

Thanks to the good people at Rapids, we can create a Dask cluster compatible with CUDA with very similar code. Here is the quick-start documentation for reference.

Create a local Dask cluster that uses the GPU. (I recommend creating a new notebook to make referencing these examples easier)

from dask_cuda import LocalCUDACluster
from dask.distributed import Client
cluster_local = LocalCUDACluster()
client = Client(cluster_local)

Use dask-cudf to run our cudf code with Dask across our local machine. This is very similar to Dask DataFrame

import dask_cudf
taxi_dask = dask_cudf.read_csv(
"s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
storage_options={"anon": True},
assume_missing=True,
)
taxi_dask.groupby("passenger_count").tip_amount.mean().compute()

If you’d like to view some performance metrics on your GPU, you can use the nvidia-smi utility

Screenshot: running watch nvidia-smi

Result:

CPU times: user 2.95 s, sys: 928 ms, total: 3.88 s
Wall time: 2min 16s
passenger_count
0.0 2.122789
1.0 2.206790
9.0 7.377822
4.0 2.023804
6.0 2.221105
3.0 2.137775
8.0 7.111625
7.0 6.675962
5.0 2.235441
<NA> 0.064550
2.0 2.214306
Name: tip_amount, dtype: float64

CPU Time: ~11 Min vs GPU Time: ~ 2 Min. Wow! That’s a significant improvement. Two minutes is much better than eleven minutes in my book. And we did it using a local Dask GPU cluster. This let us use our local GPU without the memory bottleneck.

But if you have a goldfish level attention span like I do, it would be great if this could go faster. Even with a two minute wait time, I’ll switch tabs and fall off into the internet abyss.

Let’s see how much faster we can make this run if we use multiple CPU and GPU machines. Ideally, I’d like this to run in < 30 seconds.

Use Dask On A Cluster (CPU)

I will create my Dask CPU and GPU clusters with Coiled. Again, I work for Coiled. Feel free to choose alternative approaches as detailed in the Rapids documentation . There are additional commercial Dask options like Saturn Cloud as well.

From our same Jupyter notebook, we can create a Dask cluster with Coiled. We interact with Coiled with just Python. As you can see, it’s pretty minimal code to get a cluster.

Create Coiled Cluster

import coiled
cluster = coiled.Cluster(
scheduler_cpu=2,
scheduler_memory="4 GiB",
worker_cpu=4,
worker_memory="16 GiB",
scheduler_options={"idle_timeout": "1 hour"},
name="gus-cpu-example"
)

This command creates 4 Dask workers with 4 vCPU and 16 GB RAM. We could make these instances much larger but for this example I keep the same size as our local (Jupyter) client.

Pass the Coiled cluster object to Dask.

from dask.distributed import Client
client = Client(cluster)

That’s it for Coiled — now we can run Dask code from here on out.

Run Groupby — this is the same Dask code as before. As we’ve passed our Coiled cluster object to Dask, we don’t do anything Coiled specific. We just run Dask code.

%%time
import dask.dataframe as dd
df = dd.read_csv(
"s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
dtype={
"payment_type": "UInt8",
"VendorID": "UInt8",
"passenger_count": "UInt8",
"RatecodeID": "UInt8",
},
storage_options={"anon": True},
blocksize="16 MiB",
).persist()
df.groupby("passenger_count").tip_amount.mean().compute()

Result using our 4 node Dask cluster:

Wall time: 1min 17s

CPU Cluster Time: ~80 Secs. Now we’re getting somewhere! In this case, using 4 CPU nodes we see better performance than on our local GPU cluster. This is getting closer to my goal of < 30 seconds, but we’re not there yet

Screenshot: Dask Dashboard for profiling this Dask code

Let’s see how much faster we can go with a 4 node GPU cluster.

Use Dask On A Cluster (GPU)

We’ll now ask Coiled for a separate GPU cluster. To ensure the same dependencies are installed on our cluster nodes, we can create a Coiled Software Environment. In the example below I simply specify an existing Rapids docker image (which I found in the Rapids quick-start) and Coiled takes it from there.

If you use something other than Coiled to create your Dask cluster, I’d advise you pay special attention to installing the Rapids dependencies on that cluster. This can be quite tricky and it is one area in particular where I think Coiled can really make your life easier (sales pitch over).

Create software environment. (Note: as Coiled users, we only need to create software environments once — environments are designed to be reused across many ephemeral clusters)

In [3]: coiled.create_software_environment(
...: name="gus-gpu-afar-061-v2",
...: container="rapidsai/rapidsai-nightly:21.10-cuda11.2-runtime-ubuntu20.04-py3.8",
...: conda_env_name="rapids",
...: conda={
...: "channels": ["conda-forge"],
...: "dependencies": ["afar=0.6.1"],
...: },
...: )

(Also note: feel free to change the name of your Coiled software environment: “gus-gpu-afar-061-v2” is my arbitrary and ugly naming)

Now, create our GPU cluster. If you changed the software environment name (above), be sure to use that same name in the software argument below.

import coiledcluster = coiled.Cluster(
scheduler_cpu=2,
scheduler_memory="4 GiB",
worker_cpu=4,
worker_memory="16 GiB",
worker_gpu=1,
worker_class="dask_cuda.CUDAWorker",
software="sales/gus-gpu-afar-061-v2",
scheduler_options={"idle_timeout": "1 hour"},
name="gus-gpu-afar-h"
)

Once the cluster is created, we pass our Coiled cluster object to Dask just as before. Now we can run Dask code and Coiled does not get in the way.

from dask.distributed import Client
client = Client(cluster)

Run groupby with dask_cudf (just as we did before with dask_cudf and our local GPU cluster)

import dask_cudf
taxi_dask = dask_cudf.read_csv(
"s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
storage_options={"anon": True},
assume_missing=True,
)
taxi_dask.groupby("passenger_count").tip_amount.mean().compute()
Screenshot of Dask Dashboard for GPU code

Result:

Wall time: 19.3 s

CPU Cluster Time: ~80 Secs vs GPU Cluster Time: ~20 Secs. Excellent! With our 4 node GPU cluster we see a similar 4x speedup. More importantly, with ~20 seconds between each run I’m far less tempted to alt-tab to something else and get distracted!

So for this basic DataFrame operation:

  • Local cuDF or Pandas — stuck with memory error
  • Local Dask + CPU ran in 11 minutes. Yawn — too slow.
  • Local Dask + GPU ran in 2 minutes. Getting there!
  • External 4 Node CPU Dask Cluster ran in 80 seconds. Closer!
  • External 4 Node GPU Dask Cluster ran in 20 seconds. Hooray!

Costs

Our GCP and AWS notebook instance costs are below

GCP — $0.38/hour

AWS — $0.75/hour

My Coiled clusters cost (CPU — $0.55) and (GPU — $0.81) to run through these examples. So at most you’d incur $1.50 of cost. And you’ll be well within the Coiled free tier and the AWS/GCP/Azure free tiers as well.

Bonus: Run On A Dask GPU Cluster From A Non-GPU Client (like your laptop or workstation)

Everything we’ve done so far is predicated upon having a local GPU client with a GPU and CUDA installed. Thanks to a new package from Erik Welch and James Bourbeau, you can submit Rapids code to remote Dask clusters (like our GPU cluster)

Install afar with pip

pip install afar

If you’ve followed my example so far, your Coiled GPU cluster has afar installed on each cluster node. You can reuse that existing cluster from your laptop or workstation.

We just wrap our existing code with a with statement

In order to retrieve the result, I do need to specify res.result() outside of the with block. But when I do, I see the same expected result with similar speed

passenger_count
0.0 2.122789
1.0 2.206790
9.0 7.377822
4.0 2.023804
6.0 2.221105
3.0 2.137775
8.0 7.111625
7.0 6.675962
5.0 2.235441
NaN 0.064550
2.0 2.214306
Name: tip_amount, dtype: float64

Now we can get the same results running from our local machine without having to have a GPU present (or CUDA correctly configured). Afar is quite new, so I still find using a hosted GPU client (Sagemaker/Vertex AI/Azure ML) beneficial, but it’s clearly very promising.

Shut Everything Down

Your Coiled cluster will automatically shutdown after sitting idle for a period of time (default setting is 15 mins). We can close it directly by running cluster.close().

Also, we can stop our AWS or GCP notebooks via their respective UIs.

Conclusion

Let’s review our initial questions:

  • Can GPUs make my DataFrame operations faster?

Yes! We saw 4–5x speedups using GPU(s) vs CPU(s) both locally and on clusters. The GPU cluster let us complete our computation in ~20 seconds, which makes interactive work much easier for people with short attention spans (like me)

  • Can I go faster without having to change my code?

Yes! With Rapids and Dask, we needed to make only slight modifications to our existing pandas groupby to run the same computation on a single GPU machine, a cluster of CPU machines, and a cluster of GPU machines

  • Can I set this up without having to be Linux/CLI ninja?

Yes! Rapids provided us with a conda environment we used locally and a docker image that we used for our cluster. With Coiled, we created our GPU cluster using the Rapids docker image from our jupyter notebook with just a few lines of Python — no shell, docker, or kubernetes required. We clicked and used python.

  • Will this cost me a ton of money?

No! We’ll incur < $1.50 running this example. We’ll use the Cloud and Coiled’s respective free tiers so we don’t have any out of pocket cost. We can use GCP instead of AWS to save on our our local GPU client, or use afar to not incur any local GPU client cost at all!

Next Steps

I hope this was a fun and interesting example. Of course, good practice dictates that one not optimize code without first analyzing it. In a future post we’ll review the tools available for profiling our GPU code to see if and how we can improve it. We’ll also cover different Dask cluster deployment options (not just Coiled)

--

--

Gus Cavanaugh

I write about using Python for data analysis in Enterprise settings when IT challenges get in the way https://www.linkedin.com/in/gustafrcavanaugh/