Apache Spark and Predicate Pushdown

Deepa Vasanthkumar
5 min readDec 13, 2021

--

Predicate Pushdown & Spark

What Is Predicate Logic
A predicate is a statement or mathematical assertion that contains variables, sometimes referred to as predicate variables, and may be true or false depending on those variables’ value or values.

Predicate push down
Predicate push down is another feature of Spark and Parquet that can improve query performance by reducing the amount of data read from Parquet files. Predicate push down works by evaluating filtering predicates in the query against metadata stored in the Parquet files. Parquet can optionally store statistics (in particular the minimum and maximum value for a column chunk) in the relevant metadata section of its files and can use that information to take decisions, for example, to skip reading chunks of data if the provided filter predicate value in the query is outside the range of values stored for a given column. This is a simplified explanation, there are many more details and exceptions that it does not catch, but it should give you a gist of what is happening under the hood.

Predicate Pushdown points to the filter conditions typically the ‘where clause’ which determines the number of rows to be returned. It basically relates to which rows will be filtered, not which columns.

There is a partition filter for partition pruning and push down means the filters are pushed to the source as opposed to being brought into Spark — although we can disable that. Pushdown has 2 aspects. Partition Filter allows only those partitions to be read, this saving on scanning, and then within that partition or partitions, the filter of city is subsequently applied. PARQUET is columnar structure.

How Predicate Pushdown Works
Initial Query Planning: When a query is submitted to Spark, it goes through a planning phase where Spark’s Catalyst optimizer analyzes the query and attempts to optimize it.
Filter Identification: During this analysis, the Catalyst optimizer identifies the filter conditions (predicates) specified in the query.
Pushdown to Data Source: Spark attempts to push these filter predicates down to the data source. This means that the filtering logic is applied at the data source level (e.g., within a database, a file system with specific formats like Parquet, ORC, etc.) instead of being applied after reading the data into Spark.
Data Source Execution: The data source applies the filter conditions and returns only the filtered data to Spark.
Reduced Data Transfer: Since only the filtered data is transferred from the data source to Spark, the volume of data movement is significantly reduced.
Further Processing in Spark: Spark continues to process the reduced dataset according to the rest of the query.

Filter push down implementation
In a SQL statement, a filter is often used to choose the rows that meet the given criteria. In Spark, a filter is pushed down to the data source layer using the following implementation:
👉Logical Plan Filter is in Catalyst Expression.
👉A Catalyst Expression is translated into data source Filter.
👉If the Catalyst Expression can’t be translated to data source Filter, or is not supported by data source, it will be handled at the Spark layer.
👉Otherwise, it will be pushed down to the data source layer.

Example:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PredicatePushdown").getOrCreate()

# Read data from a Parquet file
df = spark.read.parquet("path/to/parquet/file")

# Apply a filter (predicate)
filtered_df = df.filter(df["age"] > 30)

# Perform some action (e.g., count the rows)
count = filtered_df.count()

# Stop the Spark session
spark.stop()

In this example, the filter condition df[“age”] > 30 is pushed down to the Parquet file, meaning only the records where the age column is greater than 30 are read into Spark.

Aggregate push down
Aggregate functions are often used in SQL to compute a single result from a set of input values. The most commonly used aggregate functions are AVG, COUNT, MAX, MIN and SUM. If the aggregates in the SQL statement are supported by data source with the same exact semantics as Spark, these aggregates can be pushed down to the data source level to improve performance.

The performance gains are mainly in two areas:
👉Network IO between Spark and data source is dramatically reduced.
👉 The aggregate computation can be faster in data source because of presence of indexes.

Datasources that support PredicatePushdown

Parquet An open-source columnar storage format that supports efficient compression and encoding schemes.

✅ spark.sql.parquet.filterPushdown — -> true

ORC

A columnar storage format that provides high-performance read and write operations.

✅orcFilterPushDown — -> true

and the surprise part (atleast for me)

JDBC (Postgress)

Relational databases accessed through JDBC can often push down predicates to the database server.

✅spark.sql.catalog.postgresql.pushDownTableSample — -> true

The option to enable or disable TABLESAMPLE push-down into V2 JDBC data source. The default value is false, in which case Spark does not push down TABLESAMPLE to the JDBC data source. Otherwise, if value sets to true, TABLESAMPLE is pushed down to the JDBC data source.

✅ spark.sql.catalog.postgresql.pushDownLimit → true

The option to enable or disable LIMIT push-down into V2 JDBC data source. The default value is false, in which case Spark does not push down LIMIT to the JDBC data source. Otherwise, if value sets to true, LIMIT is pushed down to the JDBC data source. SPARK still applies LIMIT on the result from data source even if LIMIT is pushed down.

And now go ahead with your LIMIT clauses in JDBC

Source Code Reference:

https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc

Thank you for reading and don't forget to 👏👏👏👏👏

--

--