Prefect x Kubernetes x Ephemeral Dask: Power, without responsibility

George Coyne
SlateCo
Published in
4 min readJul 19, 2021

Prefect is rapidly becoming the standard in dataflow automation. In our last blog we briefly ran through using Prefect Executors to parallelize on a single node. The question that usually follows single node parallelization is:

How do we parallelize across multiple nodes.

The goal of this blog is to provide a minimal example of how to(with as little external configuration as possible) parallelize across multiple nodes using an ephemeral Dask cluster. In this example we will be using Kubernetes for Flow scheduling, but the pattern is similar with ECS.

A few points to keep in mind:

  • We are using prefect > 0.14
  • We are using the Kubernetes Agent and therefore the Kubernetes run_config
  • This example uses docker storage, which is a great way to get up and running, but is not generally recommended at scale.
  • This pattern is a great way to get up and running with Dask before your organization adopts a more structured approach, with dedicated clusters. If you want to experiment with a managed Dask solution, we recommend Coiled.

The Agent

First thing first, the Prefect Kubernetes Agent. Generally the Prefect CLI provides everything we need for deployment of a new Kubernetes Agent. The method of installation I generally prescribe is:

This method will generate three Kubernetes resources:

  • A Prefect Agent deployment
  • A custom role with all required permissions for general Prefect tasks, the scheduling, monitoring and deletion of Kubernetes pods.
  • A RoleBinding which grants the default service account permissions defined in the above role.

The role that is created from the --rbac option is suitable for nearly all Prefect implementations on k8s, however we need a few additional permissions in order to create our temporary Dask clusters.

In order to spin up a temporary Dask cluster, we are going to need to use the dask-kubernetes library. Specifically we want to use the KubeCluster class to start a local Dask scheduler and then dynamically launch Dask workers on a Kubernetes cluster. It is designed to dynamically launch ad-hoc deployments. Dask is provides us with a role document which we can reference for our agent.

Informed by the documentation from Dask we come up with a new agent deployment manifest.

After making changes to the file to fit our deployment, adding labels, and most importantly adding an API token, we apply the manifest with kubectl apply -f agent.yaml.

Prefect Core

Now we are ready to get into the actual Prefect Flow. The implementation here can be pretty simple, and as complex as we want it to be.

The key piece is going to be the Executor. In our previous post we talked about the three types of executors, but as a refresher:

Prefect Executors are responsible for running tasks in a flow. During execution of a flow run, a flow’s executor will be initialized, used to execute all tasks in the flow, then shutdown.

The breakdown:

  • First, define a Docker storage object. Adding in dask-kubernetes as a dependency will cause it to be installed using pip during the container build process. Kubectl CLI is a dependency of dask-kubernetes, this is installed by the RUN command in the extra_dockerfile_commands argument.
  • Next we define a few tasks. These are incredibly thin, but the important part for the purpose of our example is that they return iterables.
  • In our executor, we pass in an anonymous function with a KubeCluster Dask cluster manager.
  • KubeCluster requires an a pod_template arg, which we generate using make_pod_spec, which in turn generates a kubernetes pod spec.
  • When using Docker storage, the image produced is accessible through the Prefect context at prefect.context.image, which we pass into make_pod_spec in order to create a pod spec using our new image.
  • The adapt_kwargs allows us to provide max and min values to the autoscaling Dask cluster.
  • Ultimately the Flow, where we bring it all together.

We are finally ready to register our flow. I prefer using the CLI for Flow registration. Let’s deploy the flow using the command prefect flow register -p ephemeral_dask.py --project "YOUR_PROJECT" .

Now that we have our Flow registered, run it from the Prefect Cloud UI, through the API, or put a schedule on it.

I hope this serves to illustrate one of the many powerful patterns that Prefect can take advantage of, feel free to reach out with any questions and follow me anywhere @gabcoyne.

SLATE is a multi-disciplined engineering firm with a focus on modern approaches to data architecture and software development. SLATE delivers custom solutions that allow organizations to gain insights to drive effective decision making.

--

--