Setting up a Dask Cluster with Google Kubernetes Engine(GKE)
One of the greatest challenges in modern data science is managing the resources needed to process the datasets. Workstations are getting more powerful, but datasets grow by the second and the algorithms we use to compute our fancy results take time. However, instead of running on one machine locally, the data could be processed in parallel in the cloud. This saves time and greatly helps when dealing with larger than memory datasets. This is where Dask comes in.
What is Dask?
Per the creators:
Dask is a flexible library for parallel computing in Python.
The theory is simple, Dask provides a nearly drop-in replacement for your standard Pandas/NumPy/Python data structures that allows for most operations to be parallelized across a number of threads or machines. This has a few important implications:
- The data is split up into partitions. Due to this, each worker only processes a chunk of the data. If your dataset is larger than your system’s memory, this allows you still process it without running out of memory.
- The operations are parallelized. For example, say you wanted the sum of every element in an array. Rather than summing each one linearly, which takes O(n) time, you can parallelize the operation, leading to a theoretical O(n/k) time, where k is your number of workers.
- It should work with your existing code. Integrating Dask into your workflow should require minimal changes to any of your own work. Even most of the Scikit-learn models run just fine with a Dask dataframe.
You can run Dask just by running a few(or just one) threads on your local machine, but you are still limited by the resources and power of a single machine. The true benefit comes when you have those workers running on separate machines, not competing with each other for resources.
There are a few ways to set this up, but using Kubernetes(K8s) is the most flexible and configurable option.
What is Kubernetes?
Kubernetes, also known as K8s, is an open-source system for automating deployment, scaling, and management of containerized applications. — Kubernetes Home Page
For our needs, we can replace “containerized applications” with “Dask Workers” and it defines our use. There will be no need to manually configure or start each worker. Just launch the cluster at the start of your script, and close it at the end.
There are a few different ways to run K8s applications. Each of the major cloud providers provides their own cloud hosted option, and minikube is available for running on a single local machine.
For this article we will be using Google Kubernetes Engine(GKE). You can sign up for google cloud and get a free $300 credit to follow along here: https://cloud.google.com/free
Dask + K8s = Dask-Kubernetes
Luckily, the Dask developers have already done a majority of the work in letting us setup Dask using K8s thanks to a package known as dask-kubernetes. They do have a very simple setup available using Helm, but it lacks customizability and is much more difficult to fix any issues. We will be using their native setup with the Kubernetes Operator
Goals of this setup
The aim of this setup is to provide a reliable way to run Dask clusters without extensive user intervention. This includes creating an autoscaling cluster, setting up docker images for both the workers and client, and tying them together.
Initial Setup
The first step in getting everything working is to setup your workstation and GKE.
This guide assumes you already have an anaconda or similar setup on a Unix based system. If you still need to set one up, I recommend getting started with Mamba, rather than Anaconda due to the increased package manager speed: https://mamba.readthedocs.io/en/latest/. If you are starting fresh, start with the mamba forge installer.
If you are on windows, you can easily set a Unix system up using WSL2: https://learn.microsoft.com/en-us/windows/wsl/install From there you can go through the other setup steps.
Additionally, you will want to have docker installed on your local machine.
Once you have all that done and your Google Cloud account setup, you are ready to begin.
GKE Setup
The first thing we want to do is connect our development environment to the google environment. Begin by following these instructions: Setup the google cloud SDK.
Once that is done, we want to create our GKE cluster:
- From the Cloud console, navigate to the Kubernetes Engine Clusters Page.
- We then will create our cluster by clicking the “Create” button
- If prompted, select “Standard Mode.” Autopilot will not work for our workload AFAIK.
- Select the following options in the screens that follow then click create:
Cluster Info Screen
— Cluster name: anything you like
— Location Type: Zonal (Pricing and availability vary by region. If you have issues provisioning, try recreating with a different region)
Node Pools: Keep only the default pool with the following option
— Name: Again, something memorable
— Size: 1
— Cluster Autoscaler: Enable, set sensible min/max
— Node Machine Configuration: E2 Small with 20–30 GB boot disk(The default pool will only have a small management instance that requires little resources)
— Node Pool Security: set access scopes to manual configuration and set the storage access to Read Write. This is important
— Other options should be fine with defaults.
Automation: This is where the magic happens and what makes GKE so cool
— Enable both Vertical Pod autoscaling and node auto provisioning
— For node auto-provisioning, leave the min vCPU and RAM at 1, but increase them both to meet your workload(I have vCPU at 16 and RAM at 64)
— If you need a GPU, add that resource type and set the appropriate values
— Again set access for each API and allow for storage read and write
— I have the profile set to optimize utilization, but balanced should also work.
Networking:
— I recommend enabling Dataplane V2 as it will be the standard in the near future
Other pages and settings: the defaults should suffice, but I recommend reading and understanding every option fully for your use case - Click Create and wait for the cluster to be fully created and provisioned. This can take up to 20 minutes, depending on data center usage.
Now that it is up and running, we want to setup the kubectl cli on our local machine, following the instructions: https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl
Verify kubectl is working correctly by running: kubectl get namespaces
Setup the Dask Operator
Now that you have kubectl setup, you will want to install Helm to allow us to utilize pre-defined Helm charts, rather than manually creating every piece: https://helm.sh/docs/intro/install/
Once Helm is ready, install the Dask operator with the following command:
helm install - repo https://helm.dask.org - create-namespace -n dask-operator - generate-name dask-kubernetes-operator
It should return an output such as:
NAME: dask-kubernetes-operator-1666875935
NAMESPACE: dask-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Operator has been installed successfully.
You should be able to login to the Google Cloud Console and see the operator under the “Workloads tab”
Required Workaround
That ideally would work just fine, but during my setup, I encountered an issue due to permissions that I needed to resolve. From the GitHub issues and logs, this should be fixed soon, but YMMV.
Anyways, the steps are as follows:
Create a file rbac.yml
with the following contents:
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: dask-cluster-role
rules:
# Application: watching & handling for the custom resource we declare.
- apiGroups: [kubernetes.dask.org]
resources: [daskclusters, daskworkergroups, daskworkergroups/scale, daskjobs, daskautoscalers]
verbs: [get, list, watch, patch, create, delete]
# Application: other resources it needs to watch and get information from.
- apiGroups:
- "" # indicates the core API group
resources: [pods, pods/status]
verbs:
- "get"
- "list"
- "watch"
- apiGroups:
- "" # indicates the core API group
resources: [services]
verbs:
- "get"
- "list"
- "watch"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: dask-cluster-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: dask-cluster-role
subjects:
- kind: ServiceAccount
name: dask-sa # adjust name based on the service account you created
Update the last line name to be the name of the operator returned in the operator install.
Run the following command:
kubectl apply -f {your File}
This creates a Cluster Role Binding for our cluster that lets the Operator create pods on the same cluster.
Testing the Initial Setup
From your local machine, you will want to run Jupyter: jupyter lab
Create a new file and insert the following code:
from dask.distributed import Client
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name='cluster1')
#cluster = KubeCluster.from_name(name="foo")
#cluster.adapt(minimum=1, maximum=10)
cluster.scale(2)
# Scale up: connect to your own cluster with more resources
# see http://dask.pydata.org/en/latest/setup.html
client = Client(cluster)
client
This will create a new Dask Cluster with 2 workers. After running, it should display a dashboard address for the scheduler. You can click it and view the resources available to your two workers(likely very little due to the small default node).
If you check the google cloud console, you should see three new workloads:
- 2 workers
- 1 scheduler
You should then run
cluster.close()
client.close()
This closes the cluster and client, deleting those workloads and freeing up their resources.
Testing the autoscaling
Now for the coolest part in my opinion. By default, the cluster will not request any additional resources and you will be stuck on the single small pod. But say you have a large dataset and want your workers to have 6 GB of ram each? Well that can be easily specified in code and the GKE node auto-provisioning will make it happen.
So lets change our code to:
from dask.distributed import Client
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name='cluster2', resources={"requests": {"memory": "6Gi"}, "limits": {"memory": "6Gi"}} )
#cluster = KubeCluster.from_name(name="foo")
#cluster.adapt(minimum=4, maximum=8)
cluster.scale(2)
client = Client(cluster)
client
This is the same as before, except now we are telling K8s that we want our workers and scheduler to have 6GB of RAM each.
But if you used the e2-small instance for your default node pool, which only has 2 GB of ram, this couldn’t be fulfilled. So when this is ran, GKE will realize there are insufficient resources in the cluster and automatically create a new pool with enough memory. This process does take about 5 minutes, and unfortunately the code does not like that.
My best solution is to run the code, monitor the cloud console until the scheduler is ready, then run it again. Once the scheduler is ready, it should be able to connect and all will be good.
Once it fully runs, you should see the 3 workloads as before, but if you navigate to the clusters page and then to the node pools tab, you should see a new node pool created and 1 or 2 new nodes created.
Now, as before, close the cluster and client
The next step
Great, we’ve setup GKE and dask-kubernetes. We can create a cluster with the resources we need, and it all works! What is the next step? Can’t we just convert our pandas dataframes to Dask dataframes and be off to the races?
Well, maybe
There are a few restrictions when running a distributed deployment of Dask:
- The python software environment needs to match exactly. Every package, the python kernel, and Dask itself all need to be on the same version. Any library used locally will need to be on the workers.
- The workers need to be directly access the data.
Now there are quick and easy solutions to both of these, but they are not ideal:
- We can specify the needed extra packages with a variable passed to the cluster:
cluster = KubeCluster(name='cluster2',env={'EXTRA_CONDA_PACKAGES':'YOUR_PACKAGES'} )
The downside of this is that these packages have to be installed every time the scheduler or worker start, adding significant start-up time. Additionally it does not cover for the python kernel.
- For our data, we can host it in a public, internet accessible location and deal with those security concerns with no extra work required here.
- Alternatively, the data can be hosted in Google Cloud Storage Buckets, which keeps the data private. This requires the
gcsfs
python package and additional system configuration, which brings us back to the first issue.
However, there is an elegant solution to these issues: Docker!
Dask + K8s + Docker = 😀
Since our setup runs in K8s, and K8s pods are essentially Docker containers, we can create a custom docker container that comes with our environment pre-configured. The dask-kubernetes developers even included a built in way to specify these images by supplying an image
parameter to the KubeCluster definition, we can specify the image to use for the scheduler and workers. For ultimate compatibility, we can setup the local client to use a docker image as well, so that they are always on the same image build.
Creating our Docker images
By the end of this, we will want two separate images: one for our workers and scheduler, and one for our client. The difference being that our client will have Jupyter and appropriate extensions in addition to the other stuff.
To start, fork my demo repo here: https://github.com/patmagauran/mldocker and then clone your copy to your local machine.
Edit both base/Dockerfile
and jupyter/Dockerfile
to include your needed packages. If you plan to use google cloud storage at all, make sure gcsfs
is included in the list of packages to install. Additionally, include any packages you think you might need later. Its much easier to have them installed now, rather than redoing it later.
Both images pull from the 2022.10.2 Dask image and install a set of packages using EXTRA_CONDA_PACKAGES
. The Jupyter image also sets up the gcloud and kubectl clis in the container in a image build step, rather than a run step.
I recommend test building both images using docker build [folder]
where folder is either base or Jupyter from the base directory. If everything passes, you should be good.
You can then commit and push your changes(you could have also made these changes using the github editor or github.dev).
Then on github, you will have to enable github actions for your fork. This can be done under the settings page. I already have the workflow setup to build and deploy the images as {repoName}-base and {repoName}-jupyter.
Once that is enabled and runs, you should see your two images in the packages section of the github repo(Accessible from the main screeno fthe repo).
Using our new images
On Dask
To use the images on the KubeCluster, it is very simple, just add an image key to the constructor as follows:
cluster = KubeCluster(name='cluster2', image='YOUR_IMAGE_URL',env={'EXTRA_CONDA_PACKAGES':''} )
To get your image URL, click on the appropriate package in the github packages screen. You should see the docker pull command which will also have the URL and tag. For example, mine is ghcr.io/patmagauran/mldocker-base:main
Thats all for the cluster.
On the client
The client requires slightly more work.
I recommend starting with the docker compose file included in the repo, changing the image and environment variables to match your setup.
The tricky part is that you will need to generate a gcp secret.
First, login to the Google cloud Console and navigate to the service Accounts screen under IAM & Admin.
You will want to create a new Service account and give it a name.
Then copy the email addres it creates and go to the permissions tab.
Click on Grant Access and put the email address you copied in the Principal.
Under Assign Roles, you will need to determine what it should have access to. At a minimum it should have access to Storage, Compute Engine, and Kubernetes Engine. I set mine as a project owner as my project is a low-security personal project.
Then navigate to the keys page, and create a JSON key. Locate the downloaded file and move it to a secure location in your unix space.
Then edit the docker compose file to point the gcp-key.json file to the file you have downloaded.
Testing it out
Now you should be able to test it the same way we did before, and ensure the cluster starts up fully. You should also try some basic data processing.
A problem, and a workaround
One thing you may have noticed is that the scheduler dashboard is inaccessible when using this docker image. That is because the code by default forwards a random port from the K8s cluster to the local machine. I have recently submitted a PR to the package to allow for manual specification, so this should be configurable soon. You can use my fork if needed by adding EXTRA_PIP_PACKAGES=git+https://github.com/patmagauran/dask-kubernetes.git
as an environment variable to your docker compose and then you can configure the port via the DASK_KUBERNETES__SCHEDULER_FORWARD_PORT
environment variable.
For example, here is my full Docker compose:
version: "3.1"
services:
jupyter:
image: ghcr.io/patmagauran/mldocker-jupyter:main
ports:
- "8888:8888"
- "8786:8786"
volumes:
- /home/pat/DS352-DataMining:/home/jovyan/DS352
- /home/pat/.secrets/gcp-key.json:/home/jovyan/gcloud-service-key.json
environment:
- GCP_PROJECT_ID=ds352-368215
- CLUSTER_NAME=dask-jupyter-clone-1
- CLUSTER_ZONE=us-west1-a
- DASK_KUBERNETES__SCHEDULER_FORWARD_PORT=8786
- EXTRA_PIP_PACKAGES=git+https://github.com/patmagauran/dask-kubernetes.git
Final Steps
Using Google Cloud Storage(Optional)
There are a few options available to ensure your cluster all has access to the data. If you all you need is to read a public datafile, the solution is as simple as using that URL for your file path. But if you need privacy, or writability, setting up Cloud storage is one of the easier methods.
You will first want to create a storage bucket. To do so, login to the Google Cloud Console and head to the Cloud Storage buckets page. Create a bucket, selecting the options that make the most sense for your application. Most likely that is to set the data location to the same region as your cluster, with the standard storage class, uniform access control, and no data protection.
Once the bucket is created, you will want to grant access to the service account used in the client image we created earlier. For the roles, you will want to have the Storage Legacy Bucket Owner.
Once the bucket is created, you can upload your data to it to start via the cloud console or other methods.
To utilize google cloud storage with dask, you will want to have the gcsfs libary installed on your client and the cluster. You will then want to access the files like so:
import dask.dataframe as dd
df = dd.read_excel("gcs://bucket/path/file.xls")
Common Troubleshooting Steps
Occasionally there may be an issue with the operator or cluster that causes problems. I have found the best solutions to be deleting the cluster objects via kubectl and restarting the operator pod
Deleting the cluster objects
First get the list of DaskClusters K8s thinks exist: kubectl get DaskCluster
Then delete each entry in the list with the command kubectl delete DaskCluster/ClusterName
where ClusterName is the name of the cluster.
Don’t worry, they will be recreated the next time you run the constructor.
Restarting the pod
Simply running kubectl delete pod -n dask-operator --all
will delete the operator pod, forcing it to be recreated. Don’t worry, nothing will be lost as the operator is designed to recover from this.
Conclusion
Now that your environment is fully setup, don’t forget to read the dask documentation to better understand how to leverage your new toolkit: