Filtered Reading with RAPIDS & Dask to Optimize ETL

Ayush Dattagupta
RAPIDS AI
Published in
3 min readFeb 8, 2021

By Ayush Dattagupta and Caleb Winston

What is Filtered Reading?

A major portion of ETL job time is spent decoding and encoding data, and as data sizes grow the problem compounds. Given that a very common pattern is subsetting a dataset by columns or by selecting row slices based on logical conditions (typically with a predicate “WHERE” clause in SQL), moving these filtering operations down to the read phase of the workflow has several advantages:

  1. It significantly reduces I/O time, especially if the data is stored optimally, where values are ordered or even indexed by ranges.
  2. It reduces the amount of memory consumed — very important for GPUs, where memory can be a limiting factor.

In the database world, implementing this type of read optimization is usually referred to as “predicate pushdown”. Ideally, the logical predicate is inferred from query or DAG logic. Since cuDF is a lower level acceleration library used by lazily evaluated systems (such as Dask), it exposes APIs for conditional logical filtering.

Filtered Reading in RAPIDS cuDF

The two main methods of filtering data during reads with cuDF are column selection and statistics based predicate filtering with Apache Parquet and Apache ORC data formats. These are “big data’’ optimized storage formats providing fast “row group” and “column stripe” oriented data access. Both formats have statistical metadata for each “chunk” of records. This lets us skip blocks of data by comparing the range of values in the metadata to the provided predicates, which can be especially useful in cases where data is sorted in an order that allows omitting the read of many row groups/column stripes.

Rough Representation of a Parquet Dataset

Using Predicate Filtering in Workflow Code

In cuDF (and dask-cuDF), column based filtering is done with the columns argument while predicate based filtering is done using the filters argument. The filters argument accepts predicates in the Disjunctive Normal Form (DNF) and is represented as follows:

Note: The format is similar to the filters argument exposed by PyArrow

Filters -> List[(Tuple)] or List[List[(Tuple)]]
  • Each tuple represents a predicate for a specific column.
  • Tuples within a list are considered to be conjunctions (AND) of predicates.
  • A list of tuples is considered to be a disjunction (OR) with other lists of tuples.

Here is an example of how a predicate condition can be transformed into a filter to pass to cuDF:

Initial Expression

(col1 < A | col1 > B) & (col2 < C)

Distributing the conjunction over the inner disjunction

(col1 < A & col2 < C) | (col1 > B & col2 < C)

The expression above is its DNF would translate to the following filter

[
[(“col1”, “<” , A),(“col2”, “<”, C)],
[(“col1”, “>”, B), (“col2”, “<”, C)],
]

Here is an example of how to use such filters while reading Apache Parquet and ORC datasets with cuDF and Dask-cuDF:

import cudf
import dask_cudf
df = cudf.read_parquet(
"path/to/parquet_dataset",
columns=["col1", "col2", "col3"],
filters=[
[("col1", "<", A), ("col2", "<", C)],
[("col1", ">", B), ("col2", "<", C)],
],
)
ddf = dask_cudf.read_orc(
"path/to/orc_dataset",
columns=["col1", "col2", "col3"],
filters=[
[("col1", "<", A), ("col2", "<", C)],
[("col1", ">", B), ("col2", "<", C)],
],
)

Here is a more comprehensive notebook that demonstrates filtered reading on the NY-Taxi dataset

Filtered Reading Example on the NY-Taxi dataset

Conclusion

For the NyTaxi dataset, filtering down to a single month of data was ~7x faster and had ~4.8x lesser peak memory usage. These numbers may vary based on factors such as the amount of data that can be skipped, row_group sizes, and hardware, but the key takeaway is to push the filtering operation down to the read calls whenever possible.

A big shoutout to Caleb Winston who led the implementation of the Filtered Reading logic and API in cuDF during his NVIDIA internship.

--

--