Updating to Spark 3.0 in production
Breaking changes and expected improvements: a production point of view
With more than 70 jobs running with Spark and hundreds of gigabytes of data processed per day, Spark is a critical piece of our data pipelines.
At Teads, we use the official open-source Spark package and spawn AWS EMR clusters to run our jobs. Hence, it is essential to optimize our jobs to reduce billing. With an exciting two-fold speed-up promise, we had to give Spark 3 a try.
In this article, we first cover the main breaking changes we had to take into account to make our code compile with Spark 3. Then, we go through the new features and their performance impact on our production environment.
Let’s first discuss the four main breaking changes we faced when updating our codebase to Spark 3:
- Depreciation of
- Depreciation of untyped UserDefinedFunction, a.k.a.
- Change in behavior of
- Cassandra driver incompatibilities with third-party libraries.
Deprecation of UserDefinedAggregateFunction
UserDefinedAggregateFunction, commonly called UDAF, is particularly useful to define custom aggregations, such as averaging sparse arrays.
In Spark 3, the
sql.expressions.UserDefinedAggregateFunction API has been deprecated in favor of the
Aggregator API (based on
algebird Aggregator). The former suffers from a serialization and deserialization overhead with each merge into the buffer (merge of rows or temporary buffer). In this gist experiment, for 1000 rows, there is 1006 ser/deser. The newly introduced API aims to remove this cost.
On top of the fact that it is supposedly faster, this new API is also easier to write, read, and maintain. That makes it preferable from an engineering point of view.
As an example, here is a gist of a
Mean aggregation function with both UDAF and Aggregator.
Concrete impacts of this change observed in production are presented later in this post.
Deprecation of untyped UserDefinedFunction
We also had to face a change in the default behavior of untyped
udf) between Spark 2.4 and Spark 3.0. This change is explained in this PR and can be illustrated as follows.
Given a UDF defined this way
val f = udf((x: Int) => x, IntegerType). For a null value in Spark 2, it returns null and in Spark 3 it returns the default value of the Java type. In this case, 0 for an
One should then be especially careful and rely on strong and wide tests as this difference of behavior can change the results. For instance, this code would behave differently depending on the Spark versions:
import org.apache.spark.sql.functions._val f = udf((x: Int) => x + 1, IntegerType)Seq((None), (Some(0))).
first.getLong(0) // 1 in Spark2, 2 in Spark3
This deprecation warning raises an exception, that can be removed (if you are sure you want the new behavior) by setting the configuration
Change in behavior of `spark.emptyDataFrame`
There is a known hack in Spark 2 that allows you to make a side effect on each executor of a cluster at runtime. The idea of this hack is to create an empty
DataFrame, repartition it by the number of executors and use
foreachPartitions to trigger your side effect as in the following example:
val nbExecutors = sparkSession.sparkContext.getConf.get("spark.executor.instances").toIntsparkSession.
(_: Iterator[Row]) => idempotentSideEffect()
Unfortunately, this hack does not work anymore in Spark 3. To understand why let’s compare the physical plans in both versions.
In Spark 2:
spark.emptyDataFrame.explain(true)== Optimized Logical Plan ==
Repartition 8, true
+- LogicalRDD false== Physical Plan ==
+- Scan ExistingRDD empty== Physical Plan ==
In Spark 3:
== Optimized Logical Plan ==
LocalRelation <empty>== Physical Plan ==
We can observe with the
OptimizedLogicalPlan that the exchange on an empty
Dataframe is now optimized in Spark3 making the trick inefficient.
In a few words, the optimization comes from the inner representation of an empty
Dataframe. It used to be represented with an empty
RDD which prevents optimizations such as
PropagateEmptyRelation. In Spark 3, it is now using an empty
LocalRelation. You can dig more with this commit.
As a workaround, one can now use
spark.range(nbExecutors).repartition(nbExecutors) instead of
spark.emptyDataframe.repartition(nbExecutors) to trigger a side effect on each machine of a cluster at run time.
Cassandra driver incompatibilities between third-party libraries
In the latest version of
spark-cassandra-connector (3.0), the java driver was bumped to version 4.7.
Be careful before spending time to bump it to make sure all your third party libraries using Spark and Cassandra are up to date with the Java driver version 4.
For instance, the latest release (November 2020) of
cassandra-all is still using Cassandra's driver in version 3. Which makes libraries based on
cassandra-all incompatible with
Performance impacts observed in production
With this new Spark version, we observed some interesting speed-ups that are described in the following subsections.
Adaptive Query Execution
Adaptive Query Execution (AQE) is the main feature of Spark 3. In a few words, AQE comes with three new features for optimizing queries at runtime:
- Dynamically coalescing shuffle partitions (coalesce contiguous small tasks)
- Dynamically switching join strategies (change the join strategy based on runtime statistics)
- Dynamically optimizing skew joins (optimize joins with unbalanced data)
For more details about the new features, this blog post seems to be a good starting point.
Sadly, despite the interesting promises related to AQE, we did not observe huge improvement on our side yet. Hence, so far in production, we have only seen the impact of the dynamic coalescing of small partitions. It saved us up to 5% of the runtime. As an example, we studied the size in bytes of the tasks and their performance.
In Spark 2:
In Spark 3:
We can see the difference in behavior between Spark 2 and Spark 3 on a given stage of one of our jobs.
In Spark 2, the stage has 200 tasks (default number of tasks after a shuffle), 170 KB per task, and lasts 18 seconds.
In Spark 3, the stage has 50 tasks, 1450 KB, and lasts 5 seconds. A saving of 70% on this stage.
It is worth mentioning that the newly created stages can sometimes be harder to read and to follow because each stage resubmits its optimized DAG as a new job. For instance, for a single job with 4 stages, you might see 10 stages, 6 skipped and 4 jobs instead of 4 stages, 0 skipped, and 1 job.
It can be quite confusing when you want to track down your job.
One possible explanation of the very limited impacts that were observed can be found in the specific use cases we have, the fact that our jobs are hand-optimized, and the nature of our data.
- We have barely any skew joins.
- Our data load is not variable. We might see a factor 0.5 or 2 depending on the time of the year, but we never see an unexpected 10X load. Hence, we can choose our join strategies and the number of partitions during development.
Nevertheless, this is worth trying and digging. Do not hesitate to set
spark.sql.adaptive.enabled at true and share the performance improvement you get.
The main improvement we have seen comes from the new UDAF API. We observed a speedup between 10% and 15% for jobs impacted by this enhancement. This performance gain can be explained in two ways:
- No serialization and deserialization of the rows while aggregating (that also reduces a bit the GC pressure).
- Most of the
SortAggregatephysical operators turn into
ObjectHashAggregatewhich saves us a sort.
In Spark 2:
In Spark 3:
In this example, we can observe a 20% duration decrease of one stage, which is a pretty interesting gain. If you want to dig more into the differences between these two physical operators. I suggest you have a look at this detailed blog post.
Nested schema pruning
spark.sql.optimizer.nestedSchemaPruning.enabled configuration was available in Spark 2.4.1 and is now default in Spark 3 (see commit). This setting enables the pushdown predicate on nested fields for supported formats. It can improve reading performances by up to 70% in our case.
The good application of this parameter can be checked in the SQL tab:
Scan parquet, we only read
user_context.vidinstead of the whole
We do not see any reason why this parameter was set at false in the first place. Do not hesitate to enlighten us if you have more insights.
Pushdown predicates on CSV
As we barely use CSV at Teads, the following optimization had almost no impact on our production jobs.
However, it is worth mentioning Spark 3 now supports pushdown predicates on CSV with
spark.sql.csv.filterPushdown.enabled configuration (set to true by default).
If you want to check which format supports pushdown predicates, do not let the query plan trick you. For instance, if we have a CSV with only one column containing id, you will see that this query plan remains unchanged between the two Spark versions:
In Spark 2:
spark.read.option("header", "true").option("inferSchema", "true").csv("test.csv").filter($"id" > 5).explain()== Physical Plan ==
*(1) Project [id#55]
+- *(1) Filter (isnotnull(id#55) && (id#55 > 5))
+- *(1) FileScan csv [id#55] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/louis.fruleux/Software/spark-3.0.0-bin-hadoop2.7/test.csv], PartitionFilters: , PushedFilters: [IsNotNull(id), GreaterThan(id,5)], ReadSchema: struct<id:int>
In Spark 3:
spark.read.option("header", "true").option("inferSchema", "true").csv("test.csv").filter($"id" > 5).explain(true)== Physical Plan ==
*(1) Project [id#25]
+- *(1) Filter (isnotnull(id#25) && (id#25 > 5))
+- *(1) FileScan csv [id#25] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/louis.fruleux/Software/spark-3.0.0-bin-hadoop2.7/test.csv], PartitionFilters: , PushedFilters: [IsNotNull(id), GreaterThan(id,5)], ReadSchema: struct<id:int>
In Spark 2 it seems like the filters are pushed at the source level. However, the filter is never used in the source code. For implementation details, do not hesitate to check the source code and more especially this commit. You will notice
filters: Seq[Filter] is an argument of
buildReader, but is never used in Spark 2.
Mapreduce fileoutputcommitter algorithm version
mapreduce.fileoutputcommitter.algoritm is a parameter to control the algorithm used to write data. A known trick is to use version 2 to drastically improve the write performances.
Nothing new on the implementation side with Spark 3. However, the documentation has been fixed.
In this commit, the documentation about the default value has been fixed. It does not come from Spark but is inherited from the Hadoop version. So for Hadoop version >= 3.0.1, the default version is 2, otherwise, it is 1. This might not be true in the future, there are some discussions ongoing.
We successfully migrated our whole stack to Spark 3, including data pipelines, machine learning training, and almond kernel for notebooks.
At first, we were afraid of bumping a big framework that impacts our production a lot, but with an in-house job deployment system that enables us to roll out smoothly and only a few breaking changes in the code. The bump was easier than it seemed.
The gain is noticeable and our AWS billing has been reduced, but unfortunately not as much as we were expecting after reading some benchmarks. In databricks runtime, some queries from the TPC-ds benchmark are sped-up by 18 times.
However, with the newly introduced AQE, we expect to see more and more optimizations at runtime. We cannot wait to see the next Spark releases and optimizations to try again AQE.
If you want some tips on Spark performance improvements, do not hesitate to have a look at our other articles:
Spark performance tuning from the trenches
A collection of best practices and optimization tips for Spark 2.2.0
Spark troubleshooting from the trenches
Troubleshooting tricks and external data source management
Thanks to all those who reviewed this article. Especially Yann Moisan, Joseph Rocca, Benjamin Davy, and Han Ju.
- Waiting for code physical aggregators operators
- Adaptive Query Execution introduction by databricks
- Benchmarks of Spark 3 in databricks runtime
- Use case of a User defined aggregate function at Teads
- Gist to count the number of ser deser in old Udaf
- Gist with the syntaxes for old Udaf and Aggregator Udaf
- UserDefinedAggregateFunction deprecation documentation
- UserDefinedFunction breaking change commit
- File output committer algorithm version documentation fix
- Nested schema pruning commit
- Spark-cassandra-connector GitHub
- Spark configurations
- Tpc-ds benchmark
- CSV pushdown predicates supported