Spark SQL: Adaptive Query Execution
Altering the physical execution plan at runtime.
--
Level of parallelism and selection of the right join strategy have shown to be the key factors when it comes to complex query performance in large clusters.
Even though Spark 2.x already implemented a few parameters to somehow tweak its related behaviour, having to manually tune them was not practical in many production scenarios. Besides, a static configuration may not be the right one for all stages of a job, as usually stages located closer to the final output have to process much less data than others, because of previous filters or aggregations along the whole pipeline.
Adaptive Query Execution (SPARK-31412) is a new enhancement included in Spark 3 (announced by Databricks just a few days ago) that radically changes this mindset. The framework is now responsible for somehow improving these decisions at runtime and altering the physical execution plan dynamically when it is convenient.
Modern distributed systems are designed upon the principle of decoupling compute and storage layers, that is storing persistent data on remote storage and making use of local storage only for transient data. Spark was built following this idea (and recently Hadoop 3 has adopted it). While this design brings benefits in terms of scalability and availability (as well as cost reductions), it also hinders the query optimization process, as unpredictable data arrivals make cost calculation less reliable.
Fig. 1 depicts the internals of Spark SQL engine. Very basically, a logical plan of operations (coming from the parsing a SQL sentence or applying a lineage of transformations over a DataFrame or a Dataset) is (1) resolved against the catalog, (2) optimized on a rule-basis (constant folding, predicate and projection pushdowns…) and (3) translated into a physical plan of Spark operators (actually into multiple plans and then selected on a cost basis).
For the sake of the explanation let’s take this sample query that read two tables, filter one of them, join the results on a common key, and finally performs an aggregation on another field.
select a1, avg(b1)
from A join B on A.pk = B.pk
where b1 <= 10000
group by a1