Bursting Data Science Workloads to GPUs on Google Cloud Platform with Dask Cloud Provider

Jacob Tomlinson
RAPIDS AI
Published in
9 min readJan 15, 2021

With the recent release of Dask Cloud Provider, we’ve added virtual machine support for a number of cloud platforms. In this blog post, we will discuss scaling from RAPIDS on a local machine to a multi-node, multi-GPU cluster on Google Cloud with Dask Cloud Provider. We’ll use the same docker image we run RAPIDS with on our local machine as we do when we burst to Google Cloud Platform (GCP) for additional GPU access.

Consider the scenario where we are working on some data stored in Google Cloud Storage (GCS) and we want to analyze it using RAPIDS.

Let’s start our work on a laptop with an NVIDIA GPU, it runs Linux, has NVIDIA drivers, and has Docker installed, so I can get started with RAPIDS quickly using the Docker image.

$ docker run — gpus all — rm -it -p 8888:8888 -p 8787:8787 -p 8786:8786 rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8

Once we have our container running we will need additional packages for creating instances on GCP (dask-cloudprovider) and reading from Google Cloud Storage (gcsfs). In the running docker container execute the following:

$ pip install dask-cloudprovider[gcs] gcsfs

An Example Problem

For this blog post, we will work with the NYC Taxi data, specifically, the mirror maintained on GCS by Anaconda.

Here we have ~30GBs of CSV data stored in GCS. We could download this data locally, but instead, we are going to use Dask and RAPIDS cuDF to read this data directly and lazily in partitions. Lazy loading here means that we will construct a local representation of the data by only reading the metadata, then when we perform operations on the data it will be read just in time.

To do this we must construct a Dask cluster and connect a client to it. Because we are using RAPIDS we need to construct a LocalCUDACluster from the dask_cuda package.

>>> from dask.distributed import Client
>>> from dask_cuda import LocalCUDACluster
>>> cluster = LocalCUDACluster()
>>> client = Client(cluster)

Now that we have a cluster we can use dask_cudf to construct our dataframe.

>>> import dask_cudf
>>> df = dask_cudf.read_csv("gcs://anaconda-public-data/nyc-taxi/csv/2014/yellow_*.csv")

This will take a few seconds to read in all the metadata from GCS. Then we can have a look at some data.

>>> df.head()

Here we can see the first five rows of the dataframe, but we’ve only read one partition from GCS in order to do so. By default dask_cudf reads in 256MB partitions, but this is configurable. We can see how many partitions our data has been split into.

>>> df.npartitions
110

Next, we should clean our data a little in order to work with it as data is often messy, and this data set is no exception. The column names contain additional whitespace in this dataset and some of the long lat values are missing, so let’s tidy that up and drop missing values.

>>> df.columns = [s.strip() for s in list(df.columns)]
>>> df = df.dropna(subset=["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])

As a small example, let’s use RAPIDS cuSpatial to calculate the direct distance between the pickup and dropoff locations using the haversine_distance function. So next let’s write a function which we will map over our partitions which will calculate the distance.

>>> from cuspatial import haversine_distance
>>> def apply_haversine(partition):
... return cudf.Series(haversine_distance(
... partition.pickup_longitude,
... partition.pickup_latitude,
... partition.dropoff_longitude,
... partition.dropoff_latitude
... ))

Now we can map this function over our partitions and calculate the mean distance of all the NYC taxi trips in 2014.

>>> df.map_partitions(apply_haversine).mean().compute()
34.30891087351521

Success! We have successfully read our data directly from GCS over the internet and used our local GPU to crunch some numbers. However, this operation is limited by two main factors: slow internet connection and GPU performance.

  1. Downloading and storing the data locally doesn’t scale, so to avoid that we have streamed the data from GCS into memory. However, this approach is limited by your internet connection.
  2. We are also using a low-end mobile GPU. This is an excellent way to explore and experiment with RAPIDS on your own hardware, but when we want to do some more complex calculations we will need something faster.

But what if we could use top of the line GPUs located within Google Cloud instead of our local ones. This would also allow our computation to happen closer to the data, so we don’t need to worry about our local bandwidth.

Running on GCP

Dask Cloud Provider is a package that allows you to construct Dask clusters on various cloud resources. For this example, we are going to use the GCPCluster class which will construct a Dask cluster running on Google Cloud VM Instances.

Construct a Dask Cluster running on GCP VM Instances

The VM instances we are going to use have NVIDIA A100 GPUs which are 20x more performant than the previous generation, have 40GB of GPU memory each, and are connected via NVLINK with 600GB/s of bandwidth.

>>> from dask_cloudprovider.gcp import GCPCluster

Configure Authentication and Project

In order for GCPCluster to interact with GCP, we need to configure our authentication and project. To authenticate we can use the gcloud CLI tool.

$ gcloud auth init

We also need to configure our project in our Dask settings. This can either be placed in our Dask config at ~/.config/dask/cloudprovider.yaml

cloudprovider:
gcp:
projectid: "my-project-id"

As an environment variable

$ export DASK_CLOUDPROVIDER__GCP__PROJECTID="my-project-id"

Or directly in our code

>>> import dask.config
>>> dask.config.set(**{"cloudprovider.gcp.projectid": "my-project-id"})

Now that we have that set up we can create a cluster, but we specifically need to create a RAPIDS cluster.

cluster = GCPCluster(
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"}
)
client = Client(cluster)

Here we specify the following options:

  • The zone is the GCP zone we want our instances to launch in.
  • The machine type is the type of VM we want, here we have chosen an a2 instance that has two NVIDIA A100 GPUs.
  • We start with one worker but can scale this up later if we wished.
  • We choose the same Docker image as we are running locally. This is important to ensure the two Python environments are the same.
  • We specify that we want to use the dask_cuda.CUDAWorker class for our Dask workers as that will correctly configure our remote GPUs (this was handled locally by LocalCUDACluster)
  • We set the EXTRA_PIP_PACKAGES environment variable to ensure all the workers have gcsfs installed.

Running this will launch our Dask cluster on GCP in around 15 minutes.

Once our cluster is up and running we can perform the same analysis work we did before.

>>> df = dask_cudf.read_csv("gcs://anaconda-public-data/nyc-taxi/csv/2014/yellow_*.csv")
>>> df.columns = [s.strip() for s in list(df.columns)]
>>> df = df.dropna(subset=["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])
>>> df.map_partitions(apply_haversine).mean().compute()

This time all of the work will happen remotely in GCP, which means our data will read faster and the computation will happen on top of the line A100 GPUs.

Lastly, we need to remember to shut our cluster down.

client.close()
cluster.close()

This is great! We demonstrated how to scale our workload with more cloud resources but there are additional features to make this experience even better. We can use Context Managers for cleanup and Packer for faster launching.

Context Managers

It is easy to forget to close out your cluster, and the last thing you want to do is leave some high-performance infrastructure running unnecessarily on your account. To solve this all of the Dask cluster managers can also be used as context managers.

with GCPCluster(
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"}
) as cluster:
with Client(cluster) as client: df = dask_cudf.read_csv("gcs://anaconda-public-data/nyc-taxi/csv/2014/yellow_*.csv")
df.columns = [s.strip() for s in list(df.columns)]
df = df.dropna(subset=['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude'])
df.map_partitions(apply_haversine).mean().compute()

By using our cluster manager with a statement the cluster will be created and destroyed automatically in order to run this piece of work.

Packer

Waiting 15 minutes to start our cluster at the beginning of our workload may be an acceptable trade-off. There are network performance benefits with being inside GCP and having access to scalable GPUs means your workload will be much faster than if you were running them locally.

However, we can improve on this.

When GCPCluster starts our instances it has to prepare the VM, install drivers and software and pull down a copy of the RAPIDS docker image and decompress it. But what if our nodes were ready to go immediately?

Using a tool called packer we can create our own custom source image which already contains all of our dependencies. So let’s install Packer.

$ wget https://releases.hashicorp.com/packer/1.6.5/packer_1.6.5_linux_amd64.zip
$ unzip packer_1.6.5_linux_amd64.zip

Next, we need to create a packer config file. Packer creates a VM, runs setup scripts, and then creates a new image from that VM. So we need to first extract the provisioning script that GCPCluster uses to create our machines. We can do this with the get_cloud_init classmethod and pass in the same arguments we would use to create a cluster.

cloud_init = GCPCluster.get_cloud_init(
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"}
)

Next, we need to construct our Packer config. Most of the options here will be familiar as we’ve used them already in Dask Cloud Provider.

packer_config = {
"builders": [
{
"type": "googlecompute",
"project_id": "<PROJECT ID>",
"source_image": "ubuntu-minimal-1804-bionic-v20201014",
"ssh_username": "packer",
"zone": "us-central1-a",
"on_host_maintenance": "TERMINATE",
"disk_size": "50",
"machine_type": "a2-highgpu-2g",
"metadata": {"user-data": cloud_init},
}
],
"provisioners": [
{
"type": "shell",
"inline": [
"echo 'Waiting for cloud-init'; while [ ! -f /var/lib/cloud/instance/boot-finished ]; do sleep 1; done; echo 'Done'"
],
}
],
}

Now that we have created our config we should write it out to a JSON file to use with Packer.

import json
with open("packer.json", "w") as fh:
fh.write(json.dumps(packer_config))

We can then run the packer build.

$ packer build packer.json
googlecompute: output will be in this color.
==> googlecompute: Checking image does not exist...
==> googlecompute: Creating temporary rsa SSH key for instance...
==> googlecompute: Using image: ubuntu-minimal-1804-bionic-v20201014
==> googlecompute: Creating instance...
googlecompute: Loading zone: us-central1-a
googlecompute: Loading machine type: a2-highgpu-2g
googlecompute: Requesting instance creation...
googlecompute: Waiting for creation operation to complete...
googlecompute: Instance has been created!
==> googlecompute: Waiting for the instance to become running...
googlecompute: IP: 34.72.119.86
==> googlecompute: Using ssh communicator to connect: 34.72.119.86
==> googlecompute: Waiting for SSH to become available...
==> googlecompute: Connected to SSH!
==> googlecompute: Provisioning with shell script: /tmp/packer-shell700735704
googlecompute: Waiting for cloud-init
googlecompute: Done
==> googlecompute: Deleting instance...
googlecompute: Instance has been deleted!
==> googlecompute: Creating image...
==> googlecompute: Deleting disk...
googlecompute: Disk has been deleted!
Build 'googlecompute' finished after 9 minutes 53 seconds.
==> Wait completed after 9 minutes 53 seconds==> Builds finished. The artifacts of successful builds are:
--> googlecompute: A disk image was created: packer-1605545458

Here we can see at the bottom of our output that we have a new image called packer-1605545458. So next time we launch a GCPCluster we can pass that as the source_image option and disable the bootstrapping that Dask Cloud Provider does because we have already baked that in.

cluster = GCPCluster(
source_image="packer-1605545458",
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"},
boostrap=False
)
client = Client(cluster)

Now when we start our cluster it only takes around 5 minutes instead of 15.

Next Steps

Being able to move your workload to the cloud by changing the Dask cluster manager you are using is extremely powerful. It allows you to explore and experiment with cheap resources and then easily scale up when you are ready.

To make this experience even more pleasant there are more things we can do.

Currently, you have to learn a little about Packer and create your own Packer build config. All of the information needed is already contained within GCPCluster, so things could be expanded to generate this config for you, or even to run packer on your behalf under the hood.

We also end up waiting twice when we launch a cluster. First, we start a Dask scheduler and wait for it to become available, then we start the workers and they connect to the cluster. Therefore we could cut the wait time in half if we colocated a worker with the scheduler or improved the worker so that it could be started at the same time as the scheduler and attempt to retry the connection if the scheduler came up slower.

Imagine being able to get access to untapped amounts of GPU resources in around two minutes!

--

--