Spark 3.0 Feature — Dynamic Partition Pruning (DPP) to avoid scanning irrelevant Data

Prabhakaran Vijayanagulu
5 min readJul 28, 2020

--

Spark 3.0 has introduced multiple optimization features. Dynamic Partition Pruning (DPP) is one among them, which is an optimization on Star schema queries(data warehouse architecture model). DPP is implemented using Broadcast hashing under the hood, which we might have already seen in Broadcast Hash join(BHJ). To know DPP better, There are two basic factors to understand:

  1. Partition Pruning
  2. Broadcast hashing

Partition Pruning

It works based on the PushDownPredicate property. Using this, Spark can read the partitions only that are needed for the processing, rather than processing all the partitions. Processing the whole dataset and applying filtration is dilatory. Rather, pushing down the filter phase, before processing, will reduce the processing overhead.

For example, Consider Table-A with Person and Age joined with the Table-B of Age and Marks(with age as join-key).

Joining approach-1

In Joining Approach-1, It is noted that the processing will take complete data (partitions for which Age<25) for processing, even though the partitions are not going to be part of the join.

Joining approach-2 (With filter push down)

Joining approach-2 will be much faster and efficient, compared to Approach-1, as it joins only the relevant rows and filters out rows that cannot contain the join keys.

Broadcast Hashing

When one of your join tables is smaller, Spark itself opts for BHJ (only if threshold memory is under the limit provided). BHJ works with Broadcast hashing under the hood. Let’s understand the process of how Broadcast hashing works.

Broadcast hashing model
  1. Initially, the Table to be broadcasted is sent to the driver program. The driver program will create a Hash table out of it and getting it broadcasted across all the executors.
  2. This Hash table will have the information such as join keys and the memory addresses of where the corresponding rows are located in the memory.
  3. In this way, Join becomes significantly faster, as it is broadcasting only the key columns as a hash table.
  4. As it is broadcasted, I/O overhead is reduced, and join shuffles are significantly lowered.

Practical Demo

Demo

Dynamic partition pruning

As discussed earlier, DPP is implemented based on the above-mentioned partition pruning and Broadcast hashing. Star-schema tables are broadly classified into Fact and dimension tables, mostly where the dimension table is much smaller compared to the fact table. When joining these tables, DPP creates an internal subquery acquired out of the filter applied on the Dimension table, broadcasts it, makes a hash table out of it and internally applies it to the physical plan of the Fact table, before the scan phase. Let's understand this with a demo.

Demo

  1. Considering two tables one for the dimension(dimension_sales_price_df) and one for fact (fact_weekly_sales_df). Here, Fact table is partitioned on the Date, Store column. (This is one of the key factors of DPP, which we’ll discuss later).
a) Data Extraction
b) Partitioned fact table

2. Joining these tables now. But, Before joining, Let's add a filter to the dimension table, to check how it is affecting the fact table.

dimension_filtered_df = dimension_sales_price_df\
.filter(“Date>‘01–12–2010’”)

Joining both the tables, after the dimension filter.

fact_dimension_joined = fact_weekly_sales_df.join(dimension_filtered_df,\[‘Date’,’Store’],”inner”)

3. Printing the physical plan.

c) Physical plan for joined dataframe

4. Diving deep into the scan phase of both the tables.

Dimension table scan

Filter applied is pushed down before the scan process in dimension scan.

FileScan parquet default.dimension_price_and_cpi[Store#16,Date#17,Fuel_Price#18,CPI#19] Batched: true, DataFilters: [isnotnull(Date#17), (Date#17 > 01–12–2010), isnotnull(Store#16)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/dimension_price_and_cpi], PartitionFilters: [], PushedFilters: [IsNotNull(Date), GreaterThan(Date,01–12–2010), IsNotNull(Store)], ReadSchema: struct<Store:int,Date:string,Fuel_Price:double,CPI:string>

Fact table scan

Notice the partition filter applied, which got internally picked up from the dimension table filter. Also, A separate dynamic pruning expression is formed.

FileScan parquet default.fact_table_partitioned_by_date[Dept#24,Weekly_Sales#25,Store#26,Date#27] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/fact_table_partitioned_by_date/Store=1/Date=02–03–201…, PartitionFilters: [isnotnull(Date#27), (Date#27 > 01–12–2010), isnotnull(Store#26), dynamicpruningexpression(Date#2…, PushedFilters: [], ReadSchema: struct<Dept:int,Weekly_Sales:double>

5. The way, how the dynamic partition pruning happens is, Spark forms an inner subquery from the dimension table, which is broadcasted and hashed across all the executors(as discussed earlier in the broadcast hashing section). This subquery is meant for pruning out unwanted partitions from the fact table, which is carried out in its scanning phase.

6. Executing an action, to understand DPP from spark UI query plan. Following is the complete query execution when executed a “show” command on joined dataframe(fact_dimension_joined).

d) Spark UI

7. From the highlighted red area, we can infer two factors.

i) A subquery is formed from the dimension table and is broadcasted(Subquery Broadcast) to apply broadcast hashing.

ii) It is then applied to the fact table, in its scanning phase, so that it doesn’t carry any irrelevant data to the Join phase.

8. The detailed scanning phase will give more details as below, where the processing time for dynamic partition pruning can be noticed.

e) Expanded version of fact table scan-phase

Conclusion

There are certain limitations/factors for DPP to happen.

i) It doesn’t need any additional configuration to be set in the config.

ii) Tables that need to be pruned (larger table in most cases), must be partitioned with anyone of the join key columns.

iii) It works only with equi-joins i.e., joins with ‘=’ condition.

iv) It’s best suited for queries that follow the star-schema architectural model.

Reference links

--

--