Spark 2.4 was released recently and there are a couple of new interesting and promising features in it. In this release, there are some new major features added to Spark SQL and Structured streaming; the addition of new APIs, connectors and some additional support to Kubernetes. Here are some things that are exciting and things to look forward to.
1) Provide experimental support for Scala 2.12
This is a result of combined effort from Lightbend and Apache Spark community. This is still experimental but Scala 2.12 will be the default for the upcoming Spark 3.0.0 release, support for 2.11 is being removed as there is an ongoing work to support Java 11 at the same time.
What does this mean to you?
- With Spark supporting Scala 2.12, it’s now easier to use other libraries that support Scala 2.12, such as Kafka, Akka. There would be uniformity among the different projects as Scala 2.12 was released close to 2 years back and many other projects such as Akka, Kafka have 2.12 support already and only Spark was lagging behind due to some spark internals that leverage details of how Scala translates language constructs to JVM byte code.
- Spark can leverage the Scala feature of generating native Java 8 lambdas for every closure in the code. This avoids the previous space inefficient anonymous inner class for every closure in most of the cases. This means improved serialization of lambda functions and could produce a better stack trace for errors.
2)Built-in support for Apache Avro
Apache Avro is the most popular data serialization format when it comes to Kafka and Structured streaming and now spark provides built-in support for reading and writing Avro data. It was already an existing project spark-Avro external package.
val redditPostsDF = spark.read.format("avro").load("src/main/resources/userdata1.avro")
val basicUserDetails = redditPostsDF.select("id","first_name","last_name","email","ip_address")
basicUserDetails.write.format("avro").save("src/main/resources/basicuserdata.avro")
Note: build.sbt
needs to be updated with
"org.apache.spark" %% "spark-avro" % sparkVersion
Apart from the basic reading and writing operations. There are some handy to_avro() and from_avro() functions used to decode and encode Avro data provided with avro schema in JSON string format. This comes in handy when we are reading data off of Kafka topic or producing to Kafka streaming sink.
To read/write the data source tables that were previously created using,com.databricks.spark.avro
you can load/write these same tables using this built-in Avro module, without any code changes. Also, By default the SQL configuration : spark.sql.legacy.replaceDatabricksSparkAvro.enabled
is enabled, the data source provider iscom.databricks.spark.avro
mapped to this built-in Avro module.
With this, there are benchmarks which suggest an increase in performance of 2x in reads and 8% improvement in writes. This is the result of the elimination of intermediate conversion of Avro data from Row to InternalRow.
Now the data is serialized and deserialized directly to/from InternalRow
and Avro format data.
3) Built-in Higher-order Functions(HOFs)
Prior to 2.4, it was tricky to deal with complex data such as Arrays and Maps. A general approach to deal with this was using explode()
or write some UDFs.
With this new built-in HOFs, it will be easier to work with Arrays and Maps without losing out on performance as well. HOFs can manipulate complex data structures with an anonymous lambda function similar to UDFs but with much better performance(Especially in Python).
Let’s say we have 2 columns in our dataframe, id
and values.
For the sake of simplicity, and we wanted to make values= values+1
. had a dataframe like this:
val actualDF = Seq(
(1, Array(1, 2, 3)),
(2, Array(4, 5, 6)),
(1,Array(4,4,4))
)
+---+---------+
|ids| values|
+---+---------+
| 1|[1, 2, 3]|
| 2|[4, 5, 6]|
+---+---------+Expected Data frame:+---+-----------+
|ids|addedvalues|
+---+-----------+
| 1| [2, 3, 4]|
| 2| [5, 6, 7]|
| 1| [5, 5, 5]|
+---+-----------+
There are 2 approaches that are in general to do even this simple or any other complex operation on Arrays.
- Use explode( )functions: This is more of SQLish approach to this problem by using an explode( ) and collect. But the problem with this is it involves
group by
and that brings some inherent problems with it. One thing is the performance overhead with it and the other is the ordering guarantees. The shuffle operation is not guaranteed to keep the element order of the re-collected array from the original array. Also, this approach is error prone to null values and we assume that keys are unique.
import org.apache.spark.sql.functions._
val explodedDF = df.select(col("ids"), explode(col("values")).as("values"))
explodedDF.groupBy("ids").agg(collect_list(col("values")+1))
2. Use UDFs: I personally prefer this approach as it was more Scalaish and also there is not a performance hit(at least in Scala). Although, UDFs are frowned upon by PySpark users. But there are some issues with this approach as well.
i) There is some data serialization cost associated with UDFs.
ii) UDFs need to registered per type which can be tricky.
def addOne(values: Seq[Int]): Seq[Int] = {
values.map(value => value + 1)
}
val plusOneInt = spark.udf.register("plusOneInt", addOne(_: Seq[Int]): Seq[Int])
df.select(col("ids"), plusOneInt(col("values")).as("addedvalues"))
So the above problem could be solved in a much simple way like this:
val expectedDF = df.selectExpr("ids ,transform(values, v -> v + 1 ) as values")+---+-----------+
|ids|addedvalues|
+---+-----------+
| 1| [2, 3, 4]|
| 2| [5, 6, 7]|
| 1| [5, 5, 5]|
+---+-----------+
Some more HOF examples:
val starkDataset = Seq((Array("rob", "brian", "jon", "rickon"))).toDF("names")
+-------------------------+
|names |
+-------------------------+
|[rob, bran, jon, rickon] |
+-------------------------+val filteredDF = df.selectExpr("names", "filter(names, name -> name not like 'r%') as fileterednames")
+------------------------+--------------+
|names |fileterednames|
+------------------------+--------------+
|[rob, bran, jon, rickon]|[bran, jon] |
+------------------------+--------------+val concatDF = df.selectExpr("names", "aggregate(names,'Starks', (x,y) -> concat(x, '->', y) ) as concatnames")
+------------------------+------------------------------+
|names |concatnames |
+------------------------+------------------------------+
|[rob, bran, jon, rickon]|Starks->rob->bran->jon->rickon|
+------------------------+------------------------------+
Apart from these things, there were also some major additions.
- Barrier Execution Mode: Support for Barrier Execution Mode in the Spark scheduler is a part of the Project Hydrogen. This to better integrate with deep learning frameworks such as TensorFlow or Keras. Basically, Barrier Execution mode is a step forward to allow better integration with deep learning projects. This tried to address the basic scheduling behavior of spark, where one of the tasks fails, spark restarts it and tries too complete the computation. But this is not a valid computation for Deep learning workloads(don’t quote me on this) where tasks are dependent on each other. Barrier Execution Mode is an example of the implementation of gang scheduling.
- Min and Max watermark configuration for structured streaming.
- ForeachBatchSink for a Structured streaming sink.
- Nested schema pruning for Parquet tables
- Bucket Pruning for filtering on a single bucketed column
For more details:https://spark.apache.org/releases/spark-release-2-4-0.html
Thank you for reading this post. Hopefully, you got a brief introduction on what to expect in 2.4. This is my first ever online blog and I am still learning. Please leave your comments and suggestions below, so that I can improve in the future.