Spark 3 — Understanding Explain Formatted

German Schiavon Matteo
Analytics Vidhya
Published in
6 min readJun 5, 2020

In this post, I’ll break down the new output of the explain command. But first, let’s see how it was before 3.0.

Some people might have never used explain, but it is really useful when you want to know what’s really happening under the hood.

Sometimes we write beautiful code and expect Catalyst to solve it all for us, and sorry folks, but that’s not how it works. Catalyst makes our lives much easier, but unfortunately, it doesn’t do all the work. One of the best ways to know what it is actually doing is by calling explain on the DataFrame.

To print only the physical plan:

dataframe.explain()

And if you want to print the logical plan and the physical plan:

dataframe.explain(true)

Note that in this post I won’t go more into Catalyst or Executions Plans. There are a lot of great articles about those plans.

Okay, let’s get into it.

Below is the code we are going to analyze:

val resultDF = getLastStatus(df1.union(df2))def getLastStatus(df: DataFrame): DataFrame = {
df.groupBy('form_id)
.agg(
max('creator_user_id) as 'creator_user_id,
min('created_at) as 'created_at,
max('updated_at) as 'updated_at,
max('deleted_at) as 'deleted_at
)
}

Both DataFrames have this schema:

 |-- form_id: string (nullable = true)
|-- event_id: string (nullable = true)
|-- event_version: long (nullable = true)
|-- type: string (nullable = true)
|-- creator_user_id: long (nullable = true)
|-- created_at: timestamp (nullable = true)
|-- updated_at: timestamp (nullable = true)
|-- deleted_at: timestamp (nullable = true)
|-- workspace_id: string (nullable = true)
|-- field_position: long (nullable = true)
|-- field_id: string (nullable = true)
|-- field_title: string (nullable = true)
|-- field_reference: string (nullable = true)
|-- field_type: string (nullable = true)
|-- group_reference: string (nullable = true)
|-- required: boolean (nullable = true)
|-- field_templated: boolean (nullable = true)
|-- field_attachment_type: string (nullable = true)
|-- layout_attachment_type: string (nullable = true)
|-- choice_position: long (nullable = true)
|-- choice_id: string (nullable = true)
|-- choice_label: string (nullable = true)
|-- received_at: date (nullable = true)
|-- last_updated_at: timestamp (nullable = true)

Spark 2.4.5

Let’s call explain to see the execution plan

resultDF.explain(true)

And finally, the enormous code monster below is the result. Believe it or not, this is just a baby monster as a result of the union of two DataFrame and a groupBy.

== Parsed Logical Plan ==
'Project [form_id#24, creator_user_id#193L, created_at#195, updated_at#197, deleted_at#199, coalesce('updated_at, 'created_at, 'deleted_at) AS last_event_timestamp#205]
+- Aggregate [form_id#24], [form_id#24, max(creator_user_id#28L) AS creator_user_id#193L, min(created_at#29) AS created_at#195, max(updated_at#30) AS updated_at#197, max(deleted_at#31) AS deleted_at#199]
+- Union
:- Relation[form_id#24,event_id#25,event_version#26L,type#27,creator_user_id#28L,created_at#29,updated_at#30,deleted_at#31,workspace_id#32,field_position#33L,field_id#34,field_title#35,field_reference#36,field_type#37,group_reference#38,required#39,field_templated#40,field_attachment_type#41,layout_attachment_type#42,choice_position#43L,choice_id#44,choice_label#45,received_at#46,last_updated_at#47] json
+- Relation[form_id#96,event_id#97,event_version#98L,type#99,creator_user_id#100L,created_at#101,updated_at#102,deleted_at#103,workspace_id#104,field_position#105L,field_id#106,field_title#107,field_reference#108,field_type#109,group_reference#110,required#111,field_templated#112,field_attachment_type#113,layout_attachment_type#114,choice_position#115L,choice_id#116,choice_label#117,received_at#118,last_updated_at#119] json
== Analyzed Logical Plan ==
form_id: string, creator_user_id: bigint, created_at: timestamp, updated_at: timestamp, deleted_at: timestamp, last_event_timestamp: timestamp
Project [form_id#24, creator_user_id#193L, created_at#195, updated_at#197, deleted_at#199, coalesce(updated_at#197, created_at#195, deleted_at#199) AS last_event_timestamp#205]
+- Aggregate [form_id#24], [form_id#24, max(creator_user_id#28L) AS creator_user_id#193L, min(created_at#29) AS created_at#195, max(updated_at#30) AS updated_at#197, max(deleted_at#31) AS deleted_at#199]
+- Union
:- Relation[form_id#24,event_id#25,event_version#26L,type#27,creator_user_id#28L,created_at#29,updated_at#30,deleted_at#31,workspace_id#32,field_position#33L,field_id#34,field_title#35,field_reference#36,field_type#37,group_reference#38,required#39,field_templated#40,field_attachment_type#41,layout_attachment_type#42,choice_position#43L,choice_id#44,choice_label#45,received_at#46,last_updated_at#47] json
+- Relation[form_id#96,event_id#97,event_version#98L,type#99,creator_user_id#100L,created_at#101,updated_at#102,deleted_at#103,workspace_id#104,field_position#105L,field_id#106,field_title#107,field_reference#108,field_type#109,group_reference#110,required#111,field_templated#112,field_attachment_type#113,layout_attachment_type#114,choice_position#115L,choice_id#116,choice_label#117,received_at#118,last_updated_at#119] json
== Optimized Logical Plan ==
Aggregate [form_id#24], [form_id#24, max(creator_user_id#28L) AS creator_user_id#193L, min(created_at#29) AS created_at#195, max(updated_at#30) AS updated_at#197, max(deleted_at#31) AS deleted_at#199, coalesce(max(updated_at#30), min(created_at#29), max(deleted_at#31)) AS last_event_timestamp#205]
+- Union
:- Project [form_id#24, creator_user_id#28L, created_at#29, updated_at#30, deleted_at#31]
: +- Relation[form_id#24,event_id#25,event_version#26L,type#27,creator_user_id#28L,created_at#29,updated_at#30,deleted_at#31,workspace_id#32,field_position#33L,field_id#34,field_title#35,field_reference#36,field_type#37,group_reference#38,required#39,field_templated#40,field_attachment_type#41,layout_attachment_type#42,choice_position#43L,choice_id#44,choice_label#45,received_at#46,last_updated_at#47] json
+- Project [form_id#96, creator_user_id#100L, created_at#101, updated_at#102, deleted_at#103]
+- Relation[form_id#96,event_id#97,event_version#98L,type#99,creator_user_id#100L,created_at#101,updated_at#102,deleted_at#103,workspace_id#104,field_position#105L,field_id#106,field_title#107,field_reference#108,field_type#109,group_reference#110,required#111,field_templated#112,field_attachment_type#113,layout_attachment_type#114,choice_position#115L,choice_id#116,choice_label#117,received_at#118,last_updated_at#119] json
== Physical Plan ==
*(4) HashAggregate(keys=[form_id#24], functions=[max(creator_user_id#28L), min(created_at#29), max(updated_at#30), max(deleted_at#31)], output=[form_id#24, creator_user_id#193L, created_at#195, updated_at#197, deleted_at#199, last_event_timestamp#205])
+- Exchange hashpartitioning(form_id#24, 200)
+- *(3) HashAggregate(keys=[form_id#24], functions=[partial_max(creator_user_id#28L), partial_min(created_at#29), partial_max(updated_at#30), partial_max(deleted_at#31)], output=[form_id#24, max#217L, min#218, max#219, max#220])
+- Union
:- *(1) FileScan json [form_id#24,creator_user_id#28L,created_at#29,updated_at#30,deleted_at#31] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/Users//desarrollo//dops_spark_jobs/target/scala-2...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<form_id:string,creator_user_id:bigint,created_at:timestamp,updated_at:timestamp,deleted_at...
+- *(2) FileScan json [form_id#96,creator_user_id#100L,created_at#101,updated_at#102,deleted_at#103] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/Users//desarrollo//dops_spark_jobs/target/scala-2...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<form_id:string,creator_user_id:bigint,created_at:timestamp,updated_at:timestamp,deleted_at...

The plan is divided into four sections, and this maybe the easiest thing to extract from it:

== Parsed Logical Plan ==
== Analyzed Logical Plan ==
== Optimized Logical Plan ==
== Physical Plan ==

Then, you can see very useful stuff like this:

PartitionFilters: [], PushedFilters: []

And unless you are Jacek Laskowski, for the rest of the plan, I wish you the best of luck.

If you don’t believe me, go ahead and find Wally.

Spark 3.0

Probably what you all were waiting for.

The new explain has the following signature

def explain(mode: String): Unit

Where mode can be one of the following values:

simple, extended, codegen, cost or formatted

Simple and Extended mode

These two are very straight forward if we compare them with Spark 2.4.5

df.explain("simple") == df.explain()
df.explain("extended") == df.explain(true)

Codegen and Cost Mode

These two are more low-level ones, especially Codegen.

df.explain("codegen")

Will print all the generated code by spark, e.g

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private scala.collection.Iterator inputadapter_input_0;
/* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 011 */
/* 012 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 013 */ this.references = references;
/* 014 */ }

The cost mode can be interesting to obtain statistics if they are available.

df.explain("cost")

This will give you statistics information like the size of the DataFrame, e.g

+- Union, Statistics(sizeInBytes=12.2 KiB)

Formatted

Probably the reason you are here

Remember that we were analyzing this code:

val resultDF = getLastStatus(df1.union(df2))def getLastStatus(df: DataFrame): DataFrame = {
df.groupBy('form_id).
agg(
max('creator_user_id) as 'creator_user_id,
min('created_at) as 'created_at,
max('updated_at) as 'updated_at,
max('deleted_at) as 'deleted_at
)
}

This is the fancy way to print the executions plans in Spark 3.0 and as you can see, right away it is much clearer to read(The full plan is at the bottom).

resultDF.explain("formatted")

All the steps are numerated, which helps you understand it better.

== Physical Plan ==
* HashAggregate (8)
+- Exchange (7)
+- * HashAggregate (6)
+- Union (5)
:- * Project (2)
: +- BatchScan (1)
+- * Project (4)
+- BatchScan (3)

In the steps (1),(2),(3),(4) is reading both DataFrames and only selecting the columns needed to perform the aggregation(6) as you can see in the Output field. If you remember earlier, my DataFrame had more than 20 columns.

(1) BatchScan 
Output: [updated_at#30, creator_user_id#28L, deleted_at#31, created_at#29, form_id#24]

(2) Project [codegen id : 1]
Output : [form_id#24, creator_user_id#28L, created_at#29, updated_at#30, deleted_at#31]
Input : [form_id#24, creator_user_id#28L, created_at#29, updated_at#30, deleted_at#31]

(3) BatchScan
Output: [updated_at#102, creator_user_id#100L, form_id#96, created_at#101, deleted_at#103]

(4) Project [codegen id : 2]
Output : [form_id#96, creator_user_id#100L, created_at#101, updated_at#102, deleted_at#103]
Input : [form_id#96, creator_user_id#100L, created_at#101, updated_at#102, deleted_at#103]

Then the Union(5) of two DataFrames.

(5) Union

Then a partial aggregate(6) inside each executor.

(6) HashAggregate [codegen id : 3]
Input: [form_id#24, creator_user_id#28L, created_at#29, updated_at#30, deleted_at#31]

Then the Shuffle(7)

(7) Exchange 
Input: [form_id#24, max#226L, min#227, max#228, max#229]

and finally the last Aggregate(8)

(8) HashAggregate [codegen id : 4]
Input: [form_id#24, max#226L, min#227, max#228, max#229]

Full plan

== Physical Plan ==
* HashAggregate (8)
+- Exchange (7)
+- * HashAggregate (6)
+- Union (5)
:- * Project (2)
: +- BatchScan (1)
+- * Project (4)
+- BatchScan (3)
(1) BatchScan
Output: [updated_at#30, creator_user_id#28L, deleted_at#31, created_at#29, form_id#24]

(2) Project [codegen id : 1]
Output : [form_id#24, creator_user_id#28L, created_at#29, updated_at#30, deleted_at#31]
Input : [form_id#24, creator_user_id#28L, created_at#29, updated_at#30, deleted_at#31]

(3) BatchScan
Output: [updated_at#102, creator_user_id#100L, form_id#96, created_at#101, deleted_at#103]

(4) Project [codegen id : 2]
Output : [form_id#96, creator_user_id#100L, created_at#101, updated_at#102, deleted_at#103]
Input : [form_id#96, creator_user_id#100L, created_at#101, updated_at#102, deleted_at#103]

(5) Union

(6) HashAggregate [codegen id : 3]
Input: [form_id#24, creator_user_id#28L, created_at#29, updated_at#30, deleted_at#31]

(7) Exchange
Input: [form_id#24, max#226L, min#227, max#228, max#229]

(8) HashAggregate [codegen id : 4]
Input: [form_id#24, max#226L, min#227, max#228, max#229]

As a summary, Spark is very user friendly.

I’ve found that people with basically no experience can create their ETL’s right away, but most likely they are underperforming and wasting resources. Hence money.

Understanding the plans will help you optimize your workloads.

If they weren’t important why would they improve them in the new version?

--

--