Scheduling & Optimizing RAPIDS Workflows with Dask and Prefect
Pipelines are Complex
Real-world data flows have complex dependency graphs made difficult to manage not just by the limited speeds and feeds of the computer systems they run on, but often by the way ETL developers design them. A given dataset may be processed into the exact same, or mostly the same, intermediate outputs many times in distinct processes, whether it’s because developers on separate teams don’t have visibility into duplicate processing, or because documentation of dataflows doesn’t often make it obvious. We live in a world where pipelines are complex and things are recomputed needlessly.
With the advent of lazily evaluated, DAG-based data processing tools like Dask & Apache Spark, we move a step towards allowing declarative data processing to “hold onto” reusable tasks and datasets instead of re-computing them. However, organizational complexity usually dictates that such tools create a multitude of separate DAGs with complex pipelines. In the GPU-accelerated world of RAPIDS, we have this problem too.
Tools like Prefect and Apache AirFlow aim to reduce this complexity by allowing us to easily implement workflow management and scheduling. Prefect’s tight integration with Dask and flexible API makes it extremely easy to use with RAPIDS. In fact, the only changes for using Prefect with RAPIDS cuDF are that we use cuDF functions in tasks and use a dask-cuda based cluster as Prefect’s execution engine.
Setting up a Prefect Flow with RAPIDS
A simple Prefect Flow that utilizes cuDF and scales to multiple GPUs could look like the following:
from prefect import task, Flow, Parameter
from prefect.engine.executors import DaskExecutor
import cudf# Creating a Prefect Task that uses cuDF@task()
df = cudf.read_csv(file_path)
return df# Creating a Prefect Flowwith Flow("Read Data") as cudf_read_data_flow:
file_list = Parameter(“file_list”)
dfs = read_data.map(file_list)# Setting the Prefect execution engine to a dask_cuda based clusterexecutor = DaskExecutor(
)# Running the Prefect Flow with desired Params and executorcudf_read_data_flow.run(
file_list = ["/path/to/file1.csv", "/path/to/file2.csv", ...],
Here is a tutorial that uses Prefect to demonstrate how to set up an ETL pipeline consisting of tasks such as downloading, reading and performing groupby operations on some data using the cuDF and dask-cuDF API. It also shows how to run and scale this pipeline with a LocalCUDACluster.