RAPIDS AI
Published in

RAPIDS AI

Scheduling & Optimizing RAPIDS Workflows with Dask and Prefect

Pipelines are Complex

Real-world data flows have complex dependency graphs made difficult to manage not just by the limited speeds and feeds of the computer systems they run on, but often by the way ETL developers design them. A given dataset may be processed into the exact same, or mostly the same, intermediate outputs many times in distinct processes, whether it’s because developers on separate teams don’t have visibility into duplicate processing, or because documentation of dataflows doesn’t often make it obvious. We live in a world where pipelines are complex and things are recomputed needlessly.

With the advent of lazily evaluated, DAG-based data processing tools like Dask & Apache Spark, we move a step towards allowing declarative data processing to “hold onto” reusable tasks and datasets instead of re-computing them. However, organizational complexity usually dictates that such tools create a multitude of separate DAGs with complex pipelines. In the GPU-accelerated world of RAPIDS, we have this problem too.

Tools like Prefect and Apache AirFlow aim to reduce this complexity by allowing us to easily implement workflow management and scheduling. Prefect’s tight integration with Dask and flexible API makes it extremely easy to use with RAPIDS. In fact, the only changes for using Prefect with RAPIDS cuDF are that we use cuDF functions in tasks and use a dask-cuda based cluster as Prefect’s execution engine.

Setting up a Prefect Flow with RAPIDS

A simple Prefect Flow that utilizes cuDF and scales to multiple GPUs could look like the following:

from prefect import task, Flow, Parameter
from prefect.engine.executors import DaskExecutor
import cudf
# Creating a Prefect Task that uses cuDF@task()
def read_data(file_path):
df = cudf.read_csv(file_path)
return df
# Creating a Prefect Flowwith Flow("Read Data") as cudf_read_data_flow:
file_list = Parameter(“file_list”)
dfs = read_data.map(file_list)
# Setting the Prefect execution engine to a dask_cuda based clusterexecutor = DaskExecutor(
cluster_class="dask_cuda.LocalCUDACluster",
)
# Running the Prefect Flow with desired Params and executorcudf_read_data_flow.run(
file_list = ["/path/to/file1.csv", "/path/to/file2.csv", ...],
executor=executor,
)

Here is a tutorial that uses Prefect to demonstrate how to set up an ETL pipeline consisting of tasks such as downloading, reading and performing groupby operations on some data using the cuDF and dask-cuDF API. It also shows how to run and scale this pipeline with a LocalCUDACluster.

With RAPIDS, Dask and Prefect it’s now easier than ever to accelerate, scale and orchestrate existing workflows. For more information on getting started with RAPIDS visit rapids.ai and for more information on Prefect visit prefect.io.

--

--

--

RAPIDS is a suite of software libraries for executing end-to-end data science & analytics pipelines entirely on GPUs.

Recommended from Medium

Strawwng Params

Cassandra and Kubernetes with K8ssandra

Developers — Don’t Shine The Turd

Process Control and Digital Transformation

Internship sneak peak at Scurid

Building an AWS Lambda function that can be triggered on the client-side

Audit-ability and Traceability design pattern with .Net 6, EF6, Sql server

Perseverance in Software Engineering

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Ayush Dattagupta

Ayush Dattagupta

More from Medium

Beginner Guide to Streamlit Deployment on Azure

Getting Started with Apache Airflow Operators in Elyra

body of water with cypress trees and Spanish moss in it

From COVID to Politics — Analysing GDELT with Aero