Updating to Spark 3.0 in production

Breaking changes and expected improvements: a production point of view

Louis Fruleux
Dec 3, 2020 · 10 min read
Image for post
Image for post

With more than 70 jobs running with Spark and hundreds of gigabytes of data processed per day, Spark is a critical piece of our data pipelines.

At Teads, we use the official open-source Spark package and spawn AWS EMR clusters to run our jobs. Hence, it is essential to optimize our jobs to reduce billing. With an exciting two-fold speed-up promise, we had to give Spark 3 a try.

In this article, we first cover the main breaking changes we had to take into account to make our code compile with Spark 3. Then, we go through the new features and their performance impact on our production environment.

Breaking Changes

Let’s first discuss the four main breaking changes we faced when updating our codebase to Spark 3:

  • Depreciation of UserDefinedAggregateFunction
  • Depreciation of untyped UserDefinedFunction, a.k.a. udf(AnyRef, DataType)
  • Change in behavior of spark.emptyDataFrame
  • Cassandra driver incompatibilities with third-party libraries.

Deprecation of UserDefinedAggregateFunction

A UserDefinedAggregateFunction, commonly called UDAF, is particularly useful to define custom aggregations, such as averaging sparse arrays.

In Spark 3, the sql.expressions.UserDefinedAggregateFunction API has been deprecated in favor of the Aggregator API (based on algebird Aggregator). The former suffers from a serialization and deserialization overhead with each merge into the buffer (merge of rows or temporary buffer). In this gist experiment, for 1000 rows, there is 1006 ser/deser. The newly introduced API aims to remove this cost.

On top of the fact that it is supposedly faster, this new API is also easier to write, read, and maintain. That makes it preferable from an engineering point of view.

As an example, here is a gist of a Mean aggregation function with both UDAF and Aggregator.

Concrete impacts of this change observed in production are presented later in this post.

Deprecation of untyped UserDefinedFunction

We also had to face a change in the default behavior of untyped UserDefinedFunction (aka udf) between Spark 2.4 and Spark 3.0. This change is explained in this PR and can be illustrated as follows.

Given a UDF defined this way val f = udf((x: Int) => x, IntegerType). For a null value in Spark 2, it returns null and in Spark 3 it returns the default value of the Java type. In this case, 0 for an Int.

One should then be especially careful and rely on strong and wide tests as this difference of behavior can change the results. For instance, this code would behave differently depending on the Spark versions:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val f = udf((x: Int) => x + 1, IntegerType)Seq((None), (Some(0))).
toDF("value").
withColumn("incremented", f($"value")).
agg(sum("incremented")).
first.getLong(0) // 1 in Spark2, 2 in Spark3

This deprecation warning raises an exception, that can be removed (if you are sure you want the new behavior) by setting the configuration spark.sql.legacy.allowUntypedScalaUDF=true.

Change in behavior of `spark.emptyDataFrame`

There is a known hack in Spark 2 that allows you to make a side effect on each executor of a cluster at runtime. The idea of this hack is to create an empty DataFrame, repartition it by the number of executors and use foreachPartitions to trigger your side effect as in the following example:

val nbExecutors = sparkSession.sparkContext.getConf.get("spark.executor.instances").toIntsparkSession.
emptyDataframe.
toDF.
repartition(nbExecutors).
foreachPartition {
(_: Iterator[Row]) => idempotentSideEffect()
}

Unfortunately, this hack does not work anymore in Spark 3. To understand why let’s compare the physical plans in both versions.

In Spark 2:

spark.emptyDataFrame.explain(true)== Optimized Logical Plan ==
Repartition 8, true
+- LogicalRDD false
== Physical Plan ==
Exchange RoundRobinPartitioning(8)
+- Scan ExistingRDD empty[]
== Physical Plan ==
LocalTableScan <empty>

In Spark 3:

== Optimized Logical Plan ==
LocalRelation <empty>
== Physical Plan ==
LocalTableScan <empty>

We can observe with the OptimizedLogicalPlan that the exchange on an empty Dataframe is now optimized in Spark3 making the trick inefficient.

In a few words, the optimization comes from the inner representation of an empty Dataframe. It used to be represented with an empty RDD which prevents optimizations such as PropagateEmptyRelation. In Spark 3, it is now using an empty LocalRelation. You can dig more with this commit.

As a workaround, one can now use spark.range(nbExecutors).repartition(nbExecutors) instead of spark.emptyDataframe.repartition(nbExecutors) to trigger a side effect on each machine of a cluster at run time.

Cassandra driver incompatibilities between third-party libraries

In the latest version of spark-cassandra-connector (3.0), the java driver was bumped to version 4.7.

Be careful before spending time to bump it to make sure all your third party libraries using Spark and Cassandra are up to date with the Java driver version 4.

For instance, the latest release (November 2020) of cassandra-all is still using Cassandra's driver in version 3. Which makes libraries based on cassandra-all incompatible with spark-cassandra-connector.

Performance impacts observed in production

With this new Spark version, we observed some interesting speed-ups that are described in the following subsections.

Adaptive Query Execution

Adaptive Query Execution (AQE) is the main feature of Spark 3. In a few words, AQE comes with three new features for optimizing queries at runtime:

  • Dynamically coalescing shuffle partitions (coalesce contiguous small tasks)
  • Dynamically switching join strategies (change the join strategy based on runtime statistics)
  • Dynamically optimizing skew joins (optimize joins with unbalanced data)

For more details about the new features, this blog post seems to be a good starting point.

Sadly, despite the interesting promises related to AQE, we did not observe huge improvement on our side yet. Hence, so far in production, we have only seen the impact of the dynamic coalescing of small partitions. It saved us up to 5% of the runtime. As an example, we studied the size in bytes of the tasks and their performance.

In Spark 2:

Image for post
Image for post
Image for post
Image for post

In Spark 3:

Image for post
Image for post
Image for post
Image for post

We can see the difference in behavior between Spark 2 and Spark 3 on a given stage of one of our jobs.

In Spark 2, the stage has 200 tasks (default number of tasks after a shuffle), 170 KB per task, and lasts 18 seconds.
In Spark 3, the stage has 50 tasks, 1450 KB, and lasts 5 seconds. A saving of 70% on this stage.

It is worth mentioning that the newly created stages can sometimes be harder to read and to follow because each stage resubmits its optimized DAG as a new job. For instance, for a single job with 4 stages, you might see 10 stages, 6 skipped and 4 jobs instead of 4 stages, 0 skipped, and 1 job.

It can be quite confusing when you want to track down your job.

One possible explanation of the very limited impacts that were observed can be found in the specific use cases we have, the fact that our jobs are hand-optimized, and the nature of our data.

Indeed,

  • We have barely any skew joins.
  • Our data load is not variable. We might see a factor 0.5 or 2 depending on the time of the year, but we never see an unexpected 10X load. Hence, we can choose our join strategies and the number of partitions during development.

Nevertheless, this is worth trying and digging. Do not hesitate to set spark.sql.adaptive.enabled at true and share the performance improvement you get.

UDAF Aggregator

The main improvement we have seen comes from the new UDAF API. We observed a speedup between 10% and 15% for jobs impacted by this enhancement. This performance gain can be explained in two ways:

  • No serialization and deserialization of the rows while aggregating (that also reduces a bit the GC pressure).
  • Most of the SortAggregate physical operators turn into ObjectHashAggregate which saves us a sort.

In Spark 2:

Image for post
Image for post
Image for post
Image for post

In Spark 3:

Image for post
Image for post
Image for post
Image for post

In this example, we can observe a 20% duration decrease of one stage, which is a pretty interesting gain. If you want to dig more into the differences between these two physical operators. I suggest you have a look at this detailed blog post.

Nested schema pruning

The spark.sql.optimizer.nestedSchemaPruning.enabled configuration was available in Spark 2.4.1 and is now default in Spark 3 (see commit). This setting enables the pushdown predicate on nested fields for supported formats. It can improve reading performances by up to 70% in our case.

The good application of this parameter can be checked in the SQL tab:

Image for post
Image for post

In our Scan parquet, we only readuser_context.vidinstead of the whole user_context structure.

We do not see any reason why this parameter was set at false in the first place. Do not hesitate to enlighten us if you have more insights.

Pushdown predicates on CSV

As we barely use CSV at Teads, the following optimization had almost no impact on our production jobs.

However, it is worth mentioning Spark 3 now supports pushdown predicates on CSV with spark.sql.csv.filterPushdown.enabled configuration (set to true by default).

If you want to check which format supports pushdown predicates, do not let the query plan trick you. For instance, if we have a CSV with only one column containing id, you will see that this query plan remains unchanged between the two Spark versions:

In Spark 2:

spark.read.option("header", "true").option("inferSchema", "true").csv("test.csv").filter($"id" > 5).explain()== Physical Plan ==
*(1) Project [id#55]
+- *(1) Filter (isnotnull(id#55) && (id#55 > 5))
+- *(1) FileScan csv [id#55] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/louis.fruleux/Software/spark-3.0.0-bin-hadoop2.7/test.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,5)], ReadSchema: struct<id:int>

In Spark 3:

spark.read.option("header", "true").option("inferSchema", "true").csv("test.csv").filter($"id" > 5).explain(true)== Physical Plan ==
*(1) Project [id#25]
+- *(1) Filter (isnotnull(id#25) && (id#25 > 5))
+- *(1) FileScan csv [id#25] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/louis.fruleux/Software/spark-3.0.0-bin-hadoop2.7/test.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,5)], ReadSchema: struct<id:int>

In Spark 2 it seems like the filters are pushed at the source level. However, the filter is never used in the source code. For implementation details, do not hesitate to check the source code and more especially this commit. You will notice filters: Seq[Filter] is an argument of buildReader, but is never used in Spark 2.

Mapreduce fileoutputcommitter algorithm version

mapreduce.fileoutputcommitter.algoritm is a parameter to control the algorithm used to write data. A known trick is to use version 2 to drastically improve the write performances.
Nothing new on the implementation side with Spark 3. However, the documentation has been fixed.

In this commit, the documentation about the default value has been fixed. It does not come from Spark but is inherited from the Hadoop version. So for Hadoop version >= 3.0.1, the default version is 2, otherwise, it is 1. This might not be true in the future, there are some discussions ongoing.

Conclusion

We successfully migrated our whole stack to Spark 3, including data pipelines, machine learning training, and almond kernel for notebooks.

At first, we were afraid of bumping a big framework that impacts our production a lot, but with an in-house job deployment system that enables us to roll out smoothly and only a few breaking changes in the code. The bump was easier than it seemed.

The gain is noticeable and our AWS billing has been reduced, but unfortunately not as much as we were expecting after reading some benchmarks. In databricks runtime, some queries from the TPC-ds benchmark are sped-up by 18 times.

However, with the newly introduced AQE, we expect to see more and more optimizations at runtime. We cannot wait to see the next Spark releases and optimizations to try again AQE.

If you want some tips on Spark performance improvements, do not hesitate to have a look at our other articles:

Teads Engineering

150+ innovators building the future of digital advertising

Louis Fruleux

Written by

Passionate software developer and data engineer at Teads. Data enthusiast. LinkedIn: https://www.linkedin.com/in/louis-fruleux-04b48a127/

Teads Engineering

150+ innovators building the future of digital advertising

Louis Fruleux

Written by

Passionate software developer and data engineer at Teads. Data enthusiast. LinkedIn: https://www.linkedin.com/in/louis-fruleux-04b48a127/

Teads Engineering

150+ innovators building the future of digital advertising

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store