NVTabular is All-in on Dask

A highly-efficient multi-GPU backend for Scaling Recommender Pipelines

Rick Zamora
RAPIDS AI
10 min readDec 11, 2020

--

By: Rick Zamora, Ben Frederickson, Even Oldridge

Introduction

Recommender-systems (RecSys) are the engine of the modern internet. While surfing the web or shopping online, RecSys models dictate a large fraction of the content we encounter. When streaming media, RecSys models suggest related titles to keep us happy and engaged. These models are a significant benefit to both users and internet companies alike, but they also require the processing of vast quantities of data.

In contrast to the field of computer vision, which has already fully adopted GPU-acceleration for both the processing and training of deep-learning (DL) models, the RecSys field is still predominantly powered by CPUs. This is due to the tabular nature of the underlying data, which comprises a combination of categorical and continuous features. In the early days of general-purpose GPU programming (i.e. the early days CUDA), it was believed that GPUs could offer little-to-no benefit for the categorical encoding and embedding tasks known to dominate the computational expense of tabular data processing and RecSys-model training. However, the recent introduction of RAPIDS into the PyData ecosystem has completely turned this misguided conclusion on its head. GPU acceleration is now known to offer significant performance advantages for both tabular-data preprocessing and RecSys training alike.

The goal of NVTabular is to both simplify and accelerate the tabular-data processing pipeline needed to train (and deploy) RecSys models on GPUs. Although NVTabular has always offered an efficient ETL solution, earlier versions have relied on an iteration-based approach for larger-than-memory (LTM) processing. The specific details of the original API are nicely documented in an earlier blog post. While the initial design was simple and effective, it prevented native multi-GPU support and complicated most global-statistics logic. These drawbacks ultimately motivated the adoption of a new Dask backend for NVTabular-0.2. In this recent release, all IO and pre-processing operations are mapped onto an internal Dask-CuDF DataFrame collection, which automatically enables both LTM and data-distributed processing. By executing all ETL pipelines in this way, users are awarded full access to the feature-rich Dask ecosystem, including the diagnostics dashboard and JupyterLab extension (illustrated in Figure 1).

In this article, we will dive into the high-level details of NVTabular’s Dask-CuDF backend, and explain how the new design enables highly-efficient multi-GPU scaling and simplifies both the ETL programming model and RAPIDS (CuDF/Dask-CuDF) interoperability. Integration with Dask-CuDF improves the performance of NVTabular on a single machine and can offer a near-linear speedup for multi-GPU machines. Check out the NVIDIA Developer Blog for a more general overview of the 0.2 NVTabular release.

Figure 1. One advantage of the Dask ecosystem is the convenient Dask JupyterLab extension. This animation shows several lab-extension widgets in action during the preprocessing of the Criteo dataset on an NVIDIA DGX-1 machine. The GPU “Memory” and “Utilization” widgets in the upper right-hand side are available through NVDashboard.

Dask-CuDF in NVTabular

A key goal of NVTabular, besides the obvious GPU acceleration, is to make big-data preprocessing as painless as possible for the typical data scientist or engineer. That is, while raw performance is certainly a high priority, simplicity and usability are also key. In order to conserve (and perhaps improve) usability, while enabling NVTabular to scale on multi-GPU systems, we decided to leverage the existing Dask ecosystem.

Dask is a fast and flexible Python library for parallel computing. The flexibility comes from modular organization: Dask comprises both a task-scheduling core module, as well as a variety of “Big Data” collection modules. One of these specialized modules, Dask-Dataframe, provides a Pandas-like API for the processing of large tabular datasets. Under the hood, a Dask DataFrame is simply a lazily-evaluated collection of multiple Pandas DataFrame objects. The Dask API handles the mapping of global operations onto each distinct partition (pandas DataFrame), by constructing and executing a directed acyclic graph (DAG) of tasks.

Since NVTabular already needs to decompose large datasets into bite-sized chunks (or partitions) for GPU-accelerated processing, the general approach translates naturally to the Dask-Dataframe API. For this reason, the library was largely redesigned for the 0.2 release to depend on the RAPIDS Dask-CuDF library, which is essentially a CuDF-backed implementation of the Dask-Dataframe module. Due to the natural mapping between GPU-memory limitations and Dask-Dataframe partitioning, NVTabular now represents all data (internally) as a Dask-CuDF DataFrame (illustrated in Figure 2). This simple approach allows users to operate on large datasets using an arbitrary number of GPU resources. It also enables users to (optionally) feed-in and/or extract Dask-Dataframe collections at either end of an NVTabular Workflow.

Figure 2. Cartoon illustration of a Dask-CuDF DataFrame object (a CuDF DataFrame collection).

The Dataset

Although the move to a Dask-CuDF backend required a significant overhaul of NVTabular, the migration resulted in relatively minor changes to the 0.1.1 API. The largest exception to this trend is the deprecation of the single-GPU dataset API, and the introduction of a new (Dask-based) Dataset API. Since the internal representation of any dataset is now a Dask-CuDF DataFrame, a universal Dataset object was introduced to effectively guarantee this translation. In other words, the purpose of this new “wrapper” class is to provide the internals of NVTabular with a reliable mechanism to translate raw (unprocessed) data into a Dask collection with appropriately-sized partitions.

Figure 3. Diagram of NVTabular integration with RAPIDS/Dask.

As illustrated in Figure 3, the new Dataset class represents a critical component of the larger NVTabular library. In fact, the library can be summarized as a composition of three general APIs:

  1. Workflow — Used to define the desired data-transformation pipeline.
  2. DataLoader — Used to feed a tabular data source to a DL training/inference framework (Accessed through PyTorch or Tensorflow-based frontend).
  3. Dataset— Used to specify the data source for (1) and (2)

Clearly, the Dataset is special, because it is a required input for the other two components. For both the Workflow and DataLoader, the underlying Dask-CuDF DataFrame (ddf) of the Dataset is used to process the data in a scalable way. In both cases, a Dask “task graph” is constructed to specify the various operations that need to be performed on each partition of the data (and the dependencies between these operations). None of the raw data is actually read into GPU memory, or transformed, until absolutely necessary. For a pure Workflow, this means the data will only be read into memory when statistics are needed to complete the construction of the full task graph, or when a partition needs to be persisted to disk. For a DataLoader application, the data is only processed when a specific ddf partition is fed into the DL backend. This makes it possible to perform out-of-core processing on an arbitrary number of GPUs. In fact, even on a single-GPU system, the Dask backend leads to a performance bump over NVTabular-0.1 (a testament to the quality of task schedulers available in Dask and dask.distributed).

Important Dataset Considerations

A Dataset can be initialized with the following objects:

  • 1+ file/directory paths — An engine argument is required to specify the file format (unless file names are appended with “csv” or “parquet”)
  • cudf.DataFrame — Internal ddf will have 1 partition.
  • pandas.DataFrame — Internal ddf will have 1 partition.
  • pyarrow.Table — Internal ddf will have 1 partition.
  • dask_cudf.DataFrame — Internal ddf will be a shallow copy of the input.
  • dask.dataframe.DataFrame — Internal ddf will be a direct Pandas-to-CuDF conversion of the input.

For file-based data initialization, the size of the internal ddf partitions will be chosen according to the following arguments (in order of precedence):

  • part_size — Desired maximum size of each partition in bytes. Note that you can pass a string here. like “2GB”.
  • part_mem_fraction — Desired maximum size of each partition as a fraction of total GPU memory.

Global Statistics

While the original version of NVTabular requires all global-statistics (StatOperator) definitions to include both per-chunk and finalization logic, the move to a Dask-based backend means that all new and existing operators can be simplified to use the corresponding Dask-cuDF DataFrame API. For example, a simple operation like mean becomes a one-liner (i.e. ddf[cols].mean()). For more complicated operations, like groupby aggregations, writing custom logic can sometimes improve performance. For example, the GroupbyStatistics operator in NVTabular implements an alternative solution to Dask-CuDF’s DataFrame.groupby. Since the Dask-CuDF API is designed to handle a wide spectrum of use cases, it is not particularly well-optimized for the specific applications of categorical encoding and/or category-based feature engineering.

The Dask Cluster

Since Dask-CuDF and NVTabular are now tightly integrated, NVTabular will always use Dask for data partitioning and pre-processing. With that said, the default behavior is to utilize a “synchronous” task scheduler, which precludes distributed processing. In order to properly utilize a multi-GPU system with NVTabular, there must be a dask.distributed cluster deployed on the system, and a corresponding Client object must be passed to the Workflow.

There are many different ways to create a distributed Dask cluster (and corresponding Client object). This blog article provides an excellent overview of the various cluster-deployment utilities available. For multi-node users, the Dask cluster is often deployed through a script or command-line interface, and the NVTabular Dask client can be defined by specifying the port of the scheduler:

When using a single node/server with multiple GPUs, the LocalCUDACluster API (provided by the Dask-CUDA library) is particularly convenient. This class makes it easy to deploy a local GPU-worker cluster with automated device-to-host memory spilling, and (optionally) enables the use of NVLink and Infiniband-based inter-process communication via UCX.

Shuffling to Disk

NVTabular currently supports two shuffling options when writing output to disk (with an additional FULL option planned for a future release):

  • PER_PARTITION
  • PER_WORKER

For both these cases, the partitions of the underlying dataset/ddf are randomly ordered before any processing is performed. If PER_PARTITION is specified, each worker/process will also shuffle the rows within each partition before splitting and appending the data to an optional number of output files. Output files are distinctly mapped to each worker process. If PER_WORKER is specified, each worker will follow the same procedure as PER_PARTITION, but will re-shuffle each file after all data is persisted. This results in a full shuffle of the data processed by each worker. To improve performance, this option currently uses host-memory BytesIO objects for the intermediate persist stage. The general PER_WORKER algorithm is illustrated in Figure 4.

Figure 4. Illustration of the PER_WORKER shuffling procedure.

Workflow Execution

All of the changes described thus far allow the Dask scheduler, either synchronous or distributed, to orchestrate the preprocessing workflow at runtime (i.e. when the Workflow.apply method is finally executed). This method is designed with the following procedure in mind:

Step 1 — Convert an input Dataset to an internal ddf

Step 2 — Apply Workflow Phases. For each phase:

  • Compute statistical dependencies on ddf (eager)
  • Transform ddf (lazy)

Step 3 — Write to Disk (Optional)

  • Shuffle transformed ddf partitions
  • Append shuffled partitions into out_files_per_proc parquet files

Note that, while global-statistics are immediately gathered during the appropriate workflow phase, transform operations are always lazy. The motivation for this distinction is the fact that the dependency graph for a global-statistics operation is typically much more complex than a partition-wise transformation. Therefore, both scheduling and memory management becomes less complicated when these complex dependencies are excluded from the internal DAG of ddf.

A Simple Example: Criteo Preprocessing for DLRM

Now that we have established the necessary high-level changes in NVTabular-0.2 needed for multi-GPU scaling, we are ready to talk about performance with real data: DLRM-preprocessing of the Criteo dataset. We strongly encourage you to check out this tutorial to explore multi-GPU scaling of Criteo/DLRM for yourself. The referenced notebook was specifically designed to complement this article, and was used to collect the numbers presented below. For those who wish to explore NVTabular with a smaller dataset, we recommend the multi-gpu_dask example notebook included in the NVTabular repository.

Performance Results

As shown in Figure 5, the Criteo/DLRM workflow scales well to all eight GPUs on an NVIDIA DGX-1 system (8x 32GB V100 GPUs). For this (partially optimized) example, the 8-GPU performance reflects a parallel efficiency of 80% or so. However, it is important to recognize that significantly larger data, and/or more-complex statistical dependencies, can quickly lead to memory-spilling bottlenecks on systems with limited device memory. In cases where spilling is required for workflow execution to complete without error, moving to a many-GPU system can easily lead to super-linear scaling (an apparent >100% parallel efficiency). In fact, recent TPCx-BB benchmarking studies have clearly demonstrated that NVTabular’s parallel backend, Dask-CuDF, can effectively scale to many V100 or A100-based nodes (utilizing more than 100 GPUs).

Figure 5. Scaling of the example workflow on a DGX-1 system (8x 32GB V100 GPUs). Note that the results were obtained with special options for the 4- and 8-GPU runs. For these cases, there is enough global device memory available to avoid device-to-host spilling during the reduction of global uniques. This allows us to use on_host=False for Categorify, but requires us to use a slightly smaller partition size for Dataset (i.e. the default size). For all cases, precise tuning of these parameters (and others) can lead to additional improvements over the numbers in this figure.

Conclusion

In summary, the 0.2 release of NVTabular introduced a new Dask-CuDF backend that simplifies the programming model while enabling performant preprocessing that scales. We’ve continued improving upon it with the recent 0.3 release and are working to integrate even more closely with our upcoming API changes. We strongly encourage you to download the package, and let us know what you think. Since NVTabular is an open-source project, submitting a feature request and/or bug report is as simple as filing an issue on GitHub.

Thanks for reading!

--

--