Filtered Reading with RAPIDS & Dask to Optimize ETL
By Ayush Dattagupta and Caleb Winston
What is Filtered Reading?
A major portion of ETL job time is spent decoding and encoding data, and as data sizes grow the problem compounds. Given that a very common pattern is subsetting a dataset by columns or by selecting row slices based on logical conditions (typically with a predicate “WHERE” clause in SQL), moving these filtering operations down to the read phase of the workflow has several advantages:
- It significantly reduces I/O time, especially if the data is stored optimally, where values are ordered or even indexed by ranges.
- It reduces the amount of memory consumed — very important for GPUs, where memory can be a limiting factor.
In the database world, implementing this type of read optimization is usually referred to as “predicate pushdown”. Ideally, the logical predicate is inferred from query or DAG logic. Since cuDF is a lower level acceleration library used by lazily evaluated systems (such as Dask), it exposes APIs for conditional logical filtering.
Filtered Reading in RAPIDS cuDF
The two main methods of filtering data during reads with cuDF are column selection
and statistics based predicate filtering
with Apache Parquet and Apache ORC data formats. These are “big data’’ optimized storage formats providing fast “row group” and “column stripe” oriented data access. Both formats have statistical metadata for each “chunk” of records. This lets us skip blocks of data by comparing the range of values in the metadata to the provided predicates, which can be especially useful in cases where data is sorted in an order that allows omitting the read of many row groups/column stripes.
Using Predicate Filtering in Workflow Code
In cuDF (and dask-cuDF), column based filtering is done with the columns
argument while predicate based filtering is done using the filters
argument. The filters
argument accepts predicates in the Disjunctive Normal Form (DNF) and is represented as follows:
Note: The format is similar to the filters argument exposed by PyArrow
Filters -> List[(Tuple)] or List[List[(Tuple)]]
- Each tuple represents a predicate for a specific column.
- Tuples within a list are considered to be conjunctions (AND) of predicates.
- A list of tuples is considered to be a disjunction (OR) with other lists of tuples.
Here is an example of how a predicate condition can be transformed into a filter to pass to cuDF:
Initial Expression
(col1 < A | col1 > B) & (col2 < C)
Distributing the conjunction over the inner disjunction
(col1 < A & col2 < C) | (col1 > B & col2 < C)
The expression above is its DNF would translate to the following filter
[
[(“col1”, “<” , A),(“col2”, “<”, C)],
[(“col1”, “>”, B), (“col2”, “<”, C)],
]
Here is an example of how to use such filters while reading Apache Parquet and ORC datasets with cuDF and Dask-cuDF:
import cudf
import dask_cudfdf = cudf.read_parquet(
"path/to/parquet_dataset",
columns=["col1", "col2", "col3"],
filters=[
[("col1", "<", A), ("col2", "<", C)],
[("col1", ">", B), ("col2", "<", C)],
],
)ddf = dask_cudf.read_orc(
"path/to/orc_dataset",
columns=["col1", "col2", "col3"],
filters=[
[("col1", "<", A), ("col2", "<", C)],
[("col1", ">", B), ("col2", "<", C)],
],
)
Here is a more comprehensive notebook that demonstrates filtered reading on the NY-Taxi dataset
Conclusion
For the NyTaxi dataset, filtering down to a single month of data was ~7x faster and had ~4.8x lesser peak memory usage. These numbers may vary based on factors such as the amount of data that can be skipped, row_group sizes, and hardware, but the key takeaway is to push the filtering operation down to the read calls whenever possible.
A big shoutout to Caleb Winston who led the implementation of the Filtered Reading logic and API in cuDF during his NVIDIA internship.