Image on scnsoft

Demystifying Apache Spark 3.0

Embrace new Apache Spark features such as Adaptive Query Execution, Dynamic Partition Pruning, Structured Streaming UI, Accelerator-aware Scheduler and more

Şeref Acet
Published in
7 min readJul 13, 2020

--

Apache Spark 3.0 was released on 18/06/2020 with new features and improvements. In this blog post, I will summarize the Apache Spark 3.0 features which get me excited. You can find further details in the release notes.

Apache Spark 3.0 Features on Databricks

Adaptive Query Execution (AQE)

Adaptive Query Execution

Adaptive Query Execution (AQE) provides to change the execution plan at runtime to automatically set a number of reducers and join algorithms. That’s a new era in Apache Spark with this change.

Typically, Spark 2.4 is able to optimize the query in the planning phase however Spark 3.0 is able to do that at runtime with this brand new adaptive query execution.

Just a kindly reminder, you need to set “spark.sql.adaptive.enabled” true in order to activate adaptive query execution.

Convert Sort Merge Join to Broadcast Hash Join at Runtime

Basically, Adaptive Query Execution (AQE) can:

  • Convert sort-merge join to broadcast-hash join at runtime.
  • Dynamically coalesce shuffle partitions even when the user sets a large number of shuffle partitions. For instance, spark.sql.shuffle.partitions default value is 200 and the AQE can coalesce this partition count after the first stage.
  • Dynamically optimizes skew joins which can be detected by shuffle file statistics.

Dynamic Partition Pruning (DPP)

Before introducing the dynamic partition pruning, we should talk about static partition pruning in order to understand dynamic partition pruning comprehensively.

Static partition pruning is used to select partitions at compile time and avoid scanning irrelevant data for the query. If we are able to prune unnecessary partitions from the query execution plan, the queries use fewer resources and they will be proportionally faster and more scalable. This approach is utilized by most query optimizers.

For instance, let’s say we are running the following query:

SELECT * FROM PageViewEvent WHERE year=’2020’

Intuitively, it is supposed to read the entire dataset and filters with year=’2020’ predicate. However, it doesn’t because this query doesn’t scan the entire table. Instead, it pushes down the filter and reads the data from particular partitions that belong to year='2020'. We should not forget that this approach can only work with partitioned tables.

Static Partition Pruning | Image

Dynamic partition pruning occurs when the optimizer is unable to identify at parse time the partitions it can skip. The important point here, Dynamic partition pruning is activated at execution time. This requirement is quite common for the star-schema queries. These kinds of queries are formed of one or multiple fact tables referencing any number of dimension tables. In such join operations, we can prune the partitions the join reads from a fact table, by identifying those partitions that result from filtering the dimension tables.

Dynamic Partition Pruning | Image

The main purpose of the dynamic partition pruning is to read as little data as you need. Therefore, this feature will reduce the I/O usage and speed up the query time significantly. The good thing is that this optimization works out-of-the-box. You don’t have to do anything to activate it if your query fits the following conditions:

  • Data must be partitioned by the key of the dimension.
  • This optimization is only triggered when the join is planned as broadcast-hash join. If your query performs a sort-merge join, it can’t leverage dynamic partition pruning because both sides of your query run at the same time.
  • Dynamic partition pruning works for joins with equijoins (=). It doesn’t work for non-equijoins (>, <, >=, <=, !=, <>).

Structured Streaming UI

As of Spark 3.0, there is a “Structured Streaming” window in the Spark UI. It allows you to easily monitor your Structured Streaming jobs and some statistical information for running/completed streaming queries as shown in the screenshot below.

Structured Streaming Queries

In addition to that, if you click the Run ID, you can find the detailed information for the particular streaming query, including Input Rate, Process Rate, Input Rows, Batch Duration and Operation Duration

Streaming Query Statistics

With this release, we are able to monitor the structured streaming jobs easily and track their metrics without any hassle.

Better ANSI SQL Compliance

Spark SQL is a convenient way of converting SQL queries to Spark Jobs without any hassle. Since Spark 3.0, it is announced that two experimental options (spark.sql.ansi.enabled and spark.sql.storeAssignmentPolicy) are added in order to improve the compliance of the SQL Standard.

When spark.sql.ansi.enabled is set to true, Spark SQL complies with the standard in basic behaviors (e.g., arithmetic operations, type conversion, SQL functions, and SQL parsing).

When spark.sql.storeAssignmentPolicy is set to ANSI, Spark SQL complies with the ANSI store assignment rules. This is a separate configuration because even default spark.sql.storeAssignmentPolicy value is ANSI, configuration spark.sql.ansi.enabled is disabled by default.

For instance, in Spark SQL, arithmetic operations performed on numeric types (with the exception of decimal) are not checked for overflows by default. On the other hand, Spark SQL returns null for decimal overflows. By enabling ANSI SQL compliance, it throws an arithmetic exception at runtime when overflow occurs in numeric and interval arithmetic operations.

-- 'spark.sql.ansi.enabled=false'
SELECT 2147483647 + 1;
+----------------+
|(2147483647 + 1)|
+----------------+
| -2147483648|
+----------------+
-- 'spark.sql.ansi.enabled=true'
SELECT 2147483647 + 1;
java.lang.ArithmeticException: integer overflow

With this compliance:

  • Spark will throw a runtime exception if an overflow occurs in any operation on the integral/decimal field.
  • Spark will forbid using the reserved keywords of ANSI SQL as identifiers in the SQL parser.
  • Spark SQL is able to control implicit casting behaviors when inserting rows in a table. The casting behaviors are defined as store assignment rules in the standard.

Accelerator-aware Scheduler

Graphical Processing Unit (GPU), Field-Programmable Gate Array (FPGA), and Tensor Processing Unit (TPU) have been widely used for accelerating deep learning workloads. In order to leverage hardware accelerators in Spark applications, Spark 3.0 enhances the existing scheduler to make the cluster manager accelerator-aware.

For now, the accelerator-aware scheduler can only utilize GPU. FPGA, TPU support isn’t implemented yet. The accelerator-aware scheduler can work with standalone, YARN, and Kubernetes cluster managers but not Apache Mesos.

If you want to examine the accelerator-aware scheduler, you need to specify the required resources by configuration and request accelerators at the application level. Job/stage/task level support will be implemented in the next releases.

Native Spark Application in Kubernetes

Image

Spark supports using Kubernetes as a cluster manager since Apache Spark 2.3. From my perspective, if the Kubernetes is enough for scheduling why do we need to maintain an additional YARN cluster? It would be easier to manage one cluster manager instead of two.

Unfortunately, some parts are still not available in the Kubernetes scheduler such as external shuffle service. Dynamic allocation supported is added without an external shuffle service with this release.

It is good to remember that Kubernetes scheduler is currently experimental and it is not quite reliable to use it in production.

Java 11 Support

As is known, Spark 2.4.6 can run on the top of Java 8. Fortunately, Spark is migrated to the JDK 11 and it is only built with Scala 2.12.

With this migration, we are able to write the Spark Driver code with JDK 11 so that we can leverage Modules System, local variable type inference (var), improved streams/predicate/optional APIs, new unmodifiable collections APIs, improved system process API, Java Async HTTP client, Improved files API, etc.

From spark executor perspective, thanks to JRE 11, we are able to exploit Z Garbage Collector (ZGC), low overhead Flight Recorder, and Heap Profiler, parallel full garbage collection for G1. These are pretty useful for improving our workflow performance and troubleshooting the problems.

At last, we don’t need to preserve Java 8 anymore in our environment for running Spark :)

Hadoop 3 Support

Although Hadoop 3 is generally available, it is still not stable for production usage and I guess it’s the reason why AWS that doesn’t support Hadoop 3.0 in their Amazon EMR stack. The most recent supported version of Hadoop is 2.8.5 in the Amazon EMR.

In this part, I just want to let you know that Spark 3.0 has started to support Hadoop 3.0 and we will exploit it in a production environment eventually.

Conclusion

To sum up, there are a bunch of promising features including Adaptive Query Execution, Dynamic Partition Pruning, Accelerator-aware Scheduler, Structured Streaming UI, ANSI SQL Compliance, Java 11 Support, Native Spark Application in Kubernetes announced with Apache Spark 3.0. I just want to share my insights on these features just before using them in production.

There are various other features that I didn’t mention. Most of them are related to Machine Learning Library, R API, Python API, Panda API, and ecosystem projects.

References

https://spark.apache.org/releases/spark-release-3-0-0.html

https://databricks.com/blog/2020/06/18/introducing-apache-spark-3-0-now-available-in-databricks-runtime-7-0.html

https://databricks.com/sparkaisummit/north-america-2020

--

--