Bursting Data Science Workloads to GPUs on Google Cloud Platform with Dask Cloud Provider
With the recent release of Dask Cloud Provider, we’ve added virtual machine support for a number of cloud platforms. In this blog post, we will discuss scaling from RAPIDS on a local machine to a multi-node, multi-GPU cluster on Google Cloud with Dask Cloud Provider. We’ll use the same docker image we run RAPIDS with on our local machine as we do when we burst to Google Cloud Platform (GCP) for additional GPU access.
Consider the scenario where we are working on some data stored in Google Cloud Storage (GCS) and we want to analyze it using RAPIDS.
Let’s start our work on a laptop with an NVIDIA GPU, it runs Linux, has NVIDIA drivers, and has Docker installed, so I can get started with RAPIDS quickly using the Docker image.
$ docker run — gpus all — rm -it -p 8888:8888 -p 8787:8787 -p 8786:8786 rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8
Once we have our container running we will need additional packages for creating instances on GCP (dask-cloudprovider
) and reading from Google Cloud Storage (gcsfs
). In the running docker container execute the following:
$ pip install dask-cloudprovider[gcs] gcsfs
An Example Problem
For this blog post, we will work with the NYC Taxi data, specifically, the mirror maintained on GCS by Anaconda.
Here we have ~30GBs of CSV data stored in GCS. We could download this data locally, but instead, we are going to use Dask and RAPIDS cuDF to read this data directly and lazily in partitions. Lazy loading here means that we will construct a local representation of the data by only reading the metadata, then when we perform operations on the data it will be read just in time.
To do this we must construct a Dask cluster and connect a client to it. Because we are using RAPIDS we need to construct a LocalCUDACluster
from the dask_cuda
package.
>>> from dask.distributed import Client
>>> from dask_cuda import LocalCUDACluster
>>> cluster = LocalCUDACluster()
>>> client = Client(cluster)
Now that we have a cluster we can use dask_cudf
to construct our dataframe.
>>> import dask_cudf
>>> df = dask_cudf.read_csv("gcs://anaconda-public-data/nyc-taxi/csv/2014/yellow_*.csv")
This will take a few seconds to read in all the metadata from GCS. Then we can have a look at some data.
>>> df.head()
Here we can see the first five rows of the dataframe, but we’ve only read one partition from GCS in order to do so. By default dask_cudf
reads in 256MB partitions, but this is configurable. We can see how many partitions our data has been split into.
>>> df.npartitions
110
Next, we should clean our data a little in order to work with it as data is often messy, and this data set is no exception. The column names contain additional whitespace in this dataset and some of the long lat values are missing, so let’s tidy that up and drop missing values.
>>> df.columns = [s.strip() for s in list(df.columns)]
>>> df = df.dropna(subset=["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])
As a small example, let’s use RAPIDS cuSpatial to calculate the direct distance between the pickup and dropoff locations using the haversine_distance
function. So next let’s write a function which we will map over our partitions which will calculate the distance.
>>> from cuspatial import haversine_distance
>>> def apply_haversine(partition):
... return cudf.Series(haversine_distance(
... partition.pickup_longitude,
... partition.pickup_latitude,
... partition.dropoff_longitude,
... partition.dropoff_latitude
... ))
Now we can map this function over our partitions and calculate the mean distance of all the NYC taxi trips in 2014.
>>> df.map_partitions(apply_haversine).mean().compute()
34.30891087351521
Success! We have successfully read our data directly from GCS over the internet and used our local GPU to crunch some numbers. However, this operation is limited by two main factors: slow internet connection and GPU performance.
- Downloading and storing the data locally doesn’t scale, so to avoid that we have streamed the data from GCS into memory. However, this approach is limited by your internet connection.
- We are also using a low-end mobile GPU. This is an excellent way to explore and experiment with RAPIDS on your own hardware, but when we want to do some more complex calculations we will need something faster.
But what if we could use top of the line GPUs located within Google Cloud instead of our local ones. This would also allow our computation to happen closer to the data, so we don’t need to worry about our local bandwidth.
Running on GCP
Dask Cloud Provider is a package that allows you to construct Dask clusters on various cloud resources. For this example, we are going to use the GCPCluster class which will construct a Dask cluster running on Google Cloud VM Instances.
Construct a Dask Cluster running on GCP VM Instances
The VM instances we are going to use have NVIDIA A100 GPUs which are 20x more performant than the previous generation, have 40GB of GPU memory each, and are connected via NVLINK with 600GB/s of bandwidth.
>>> from dask_cloudprovider.gcp import GCPCluster
Configure Authentication and Project
In order for GCPCluster to interact with GCP, we need to configure our authentication and project. To authenticate we can use the gcloud CLI tool.
$ gcloud auth init
We also need to configure our project in our Dask settings. This can either be placed in our Dask config at ~/.config/dask/cloudprovider.yaml
cloudprovider:
gcp:
projectid: "my-project-id"
As an environment variable
$ export DASK_CLOUDPROVIDER__GCP__PROJECTID="my-project-id"
Or directly in our code
>>> import dask.config
>>> dask.config.set(**{"cloudprovider.gcp.projectid": "my-project-id"})
Now that we have that set up we can create a cluster, but we specifically need to create a RAPIDS cluster.
cluster = GCPCluster(
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"}
)
client = Client(cluster)
Here we specify the following options:
- The zone is the GCP zone we want our instances to launch in.
- The machine type is the type of VM we want, here we have chosen an a2 instance that has two NVIDIA A100 GPUs.
- We start with one worker but can scale this up later if we wished.
- We choose the same Docker image as we are running locally. This is important to ensure the two Python environments are the same.
- We specify that we want to use the
dask_cuda.CUDAWorker
class for our Dask workers as that will correctly configure our remote GPUs (this was handled locally byLocalCUDACluster
) - We set the
EXTRA_PIP_PACKAGES
environment variable to ensure all the workers have gcsfs installed.
Running this will launch our Dask cluster on GCP in around 15 minutes.
Once our cluster is up and running we can perform the same analysis work we did before.
>>> df = dask_cudf.read_csv("gcs://anaconda-public-data/nyc-taxi/csv/2014/yellow_*.csv")
>>> df.columns = [s.strip() for s in list(df.columns)]
>>> df = df.dropna(subset=["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"])
>>> df.map_partitions(apply_haversine).mean().compute()
This time all of the work will happen remotely in GCP, which means our data will read faster and the computation will happen on top of the line A100 GPUs.
Lastly, we need to remember to shut our cluster down.
client.close()
cluster.close()
This is great! We demonstrated how to scale our workload with more cloud resources but there are additional features to make this experience even better. We can use Context Managers for cleanup and Packer for faster launching.
Context Managers
It is easy to forget to close out your cluster, and the last thing you want to do is leave some high-performance infrastructure running unnecessarily on your account. To solve this all of the Dask cluster managers can also be used as context managers.
with GCPCluster(
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"}
) as cluster: with Client(cluster) as client: df = dask_cudf.read_csv("gcs://anaconda-public-data/nyc-taxi/csv/2014/yellow_*.csv")
df.columns = [s.strip() for s in list(df.columns)]
df = df.dropna(subset=['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude'])
df.map_partitions(apply_haversine).mean().compute()
By using our cluster manager with a statement the cluster will be created and destroyed automatically in order to run this piece of work.
Packer
Waiting 15 minutes to start our cluster at the beginning of our workload may be an acceptable trade-off. There are network performance benefits with being inside GCP and having access to scalable GPUs means your workload will be much faster than if you were running them locally.
However, we can improve on this.
When GCPCluster starts our instances it has to prepare the VM, install drivers and software and pull down a copy of the RAPIDS docker image and decompress it. But what if our nodes were ready to go immediately?
Using a tool called packer
we can create our own custom source image which already contains all of our dependencies. So let’s install Packer.
$ wget https://releases.hashicorp.com/packer/1.6.5/packer_1.6.5_linux_amd64.zip
$ unzip packer_1.6.5_linux_amd64.zip
Next, we need to create a packer config file. Packer creates a VM, runs setup scripts, and then creates a new image from that VM. So we need to first extract the provisioning script that GCPCluster
uses to create our machines. We can do this with the get_cloud_init
classmethod and pass in the same arguments we would use to create a cluster.
cloud_init = GCPCluster.get_cloud_init(
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"}
)
Next, we need to construct our Packer config. Most of the options here will be familiar as we’ve used them already in Dask Cloud Provider.
packer_config = {
"builders": [
{
"type": "googlecompute",
"project_id": "<PROJECT ID>",
"source_image": "ubuntu-minimal-1804-bionic-v20201014",
"ssh_username": "packer",
"zone": "us-central1-a",
"on_host_maintenance": "TERMINATE",
"disk_size": "50",
"machine_type": "a2-highgpu-2g",
"metadata": {"user-data": cloud_init},
}
],
"provisioners": [
{
"type": "shell",
"inline": [
"echo 'Waiting for cloud-init'; while [ ! -f /var/lib/cloud/instance/boot-finished ]; do sleep 1; done; echo 'Done'"
],
}
],
}
Now that we have created our config we should write it out to a JSON file to use with Packer.
import json
with open("packer.json", "w") as fh:
fh.write(json.dumps(packer_config))
We can then run the packer build.
$ packer build packer.json
googlecompute: output will be in this color.==> googlecompute: Checking image does not exist...
==> googlecompute: Creating temporary rsa SSH key for instance...
==> googlecompute: Using image: ubuntu-minimal-1804-bionic-v20201014
==> googlecompute: Creating instance...
googlecompute: Loading zone: us-central1-a
googlecompute: Loading machine type: a2-highgpu-2g
googlecompute: Requesting instance creation...
googlecompute: Waiting for creation operation to complete...
googlecompute: Instance has been created!
==> googlecompute: Waiting for the instance to become running...
googlecompute: IP: 34.72.119.86
==> googlecompute: Using ssh communicator to connect: 34.72.119.86
==> googlecompute: Waiting for SSH to become available...
==> googlecompute: Connected to SSH!
==> googlecompute: Provisioning with shell script: /tmp/packer-shell700735704
googlecompute: Waiting for cloud-init
googlecompute: Done
==> googlecompute: Deleting instance...
googlecompute: Instance has been deleted!
==> googlecompute: Creating image...
==> googlecompute: Deleting disk...
googlecompute: Disk has been deleted!
Build 'googlecompute' finished after 9 minutes 53 seconds.==> Wait completed after 9 minutes 53 seconds==> Builds finished. The artifacts of successful builds are:
--> googlecompute: A disk image was created: packer-1605545458
Here we can see at the bottom of our output that we have a new image called packer-1605545458
. So next time we launch a GCPCluster
we can pass that as the source_image
option and disable the bootstrapping that Dask Cloud Provider does because we have already baked that in.
cluster = GCPCluster(
source_image="packer-1605545458",
zone="us-central1-a",
machine_type="a2-highgpu-2g",
n_workers=1,
docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8",
worker_class="dask_cuda.CUDAWorker",
env_vars={"EXTRA_PIP_PACKAGES": "gcsfs"},
boostrap=False
)
client = Client(cluster)
Now when we start our cluster it only takes around 5 minutes instead of 15.
Next Steps
Being able to move your workload to the cloud by changing the Dask cluster manager you are using is extremely powerful. It allows you to explore and experiment with cheap resources and then easily scale up when you are ready.
To make this experience even more pleasant there are more things we can do.
Currently, you have to learn a little about Packer and create your own Packer build config. All of the information needed is already contained within GCPCluster
, so things could be expanded to generate this config for you, or even to run packer on your behalf under the hood.
We also end up waiting twice when we launch a cluster. First, we start a Dask scheduler and wait for it to become available, then we start the workers and they connect to the cluster. Therefore we could cut the wait time in half if we colocated a worker with the scheduler or improved the worker so that it could be started at the same time as the scheduler and attempt to retry the connection if the scheduler came up slower.
Imagine being able to get access to untapped amounts of GPU resources in around two minutes!