Spark Session and the singleton misconception
What structured streaming reveals about Spark Session
Why to even care?
In the old days of Spark, the entry point to Spark application was SparkContext
and as per Spark source code comments it “represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster”. Since Spark 2.0, a new entry point has been introduced called SparkSession
. Spark session is the preferred way for accessing most of spark functionality specially that the focus is more on high level API’s like SQL API and less on low level RDD API.
At first glance, Spark session sounds like a singleton and there seems to be a single instance per the whole application regardless of how you grab a pointer to that session. Sometimes you just use spark
variable and sometimes you get it from a DataFrame variable but in all cases it is the same.
As you can see from the above screenshot, the variable address and sessionUUID
property are identical whether you use the famous spark
variable or you grab it from a DataFrame.
Do you think this pattern is universal or in other words is Spark session a singleton?
For too long, I thought yes it is a singleton and for most cases you don’t even care about its nature. But I hit a case recently that proved I am wrong and that is Spark structured streaming.
Let’s try this snippet which uses foreachBatch
to run batch operations on a streaming micro-batch.
Here is the result.
Session memory address and UUID property are different between the global spark
variable and the Spark session connected to the micro batch DataFrame. But why this happens and how it may impact your code?
The why first!
After a bit of digging in the source code (of Spark version I use which is 3.2.1) I found this line in a file named StreamExecution.scala
.
/** Isolated spark session to run the batches with. */
private val sparkSessionForStream = sparkSession.cloneSession()
Tip: If you use Databricks then expect that their Spark implementation is not 100% identical to the open source Spark repo on GitHub.
I checked SparkSession
class first to find how it can be constructed and then spotted a cloneSession
method which seems to be called from a single location and that is streaming execution file. Cloning here means there will be an isolated session for the Spark SQL API code running the streaming code. That session will inherit the state from the root Spark session but it will work in isolation because it needs to enforce some behaviour that could be different than the configuration of root session.
For example, it seems that Adaptive Query Execution has to be disabled for streaming.
// Adaptive execution can change num shuffle partitions, disallow sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") // Disable cost-based join optimization as we do not want stateful operations
// to be rearranged sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
Cool, now we understand that there can be multiple sessions per application and each one of them can have different configuration. This is used for the structured streaming API but it can be also used in user application code as well as there are two public API’s (newSession and cloneSession) that allows you to create a new session. Multiple sessions can live happily in the same Spark application which maps to a single SparkContext.
The two sessions shown above have different memory address but they are tied to the same SparkContext. They can be still used to create DataFrames and generally speaking will behave similar to the root spark
session although they can have different configuration settings if needed.
But why do I need to care ?
Well, sometimes you may need to care. First it’s nice to know something new, have a new tool in your toolbox or know how things work under the hood. If that’s not really enough, let’s see this case which is written using PySpark for a reason I will mention shortly.
The above will fail with error Table or view not found: microBatch
. In the above snippet, say we need to use SQL statement to run some logic for example a MERGE statement. Yes, it can be done using functional API but sometimes writing SQL code is more readable.
In PySpark, there is no sparkSession
attribute on a DataFrame instance at least till version 3.2.1 (Scala has it by the way). So the other option is to use df.sql_ctx
property which can point to a Spark session but unfortunately that seems to be pointing to the root Spark session. That’s why we get that error message because microBatch
temporary view is registered on the session of streaming micro batch while the SQL statement on line 3 runs against the root Spark session so the view is not visible there.
If you google the words “spark streaming foreachBatch createOrReplaceTempView” : you will probably get a result from Databricks website with some notebook that has code like this.
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF._jdf.sparkSession().sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Well, that solves the problem by using microBatchOutputDF._jdf.sparkSession()
approach. The comment talks about using SparkSession used to define the updates
DataFrame but it is pretty vague. Plus it is not very idiomatic/recommended in Python to use methods starting with underscore because they are considered private.
Actually if you check PySpark documentation of DataFrame, you will not find a public property called _jdf
nor you will find asql_ctx
. I still prefer sql_ctx
approach because it behaves in a Spark natural manner. If you collect some records from SQL statement you will get an array of Row
elements.
Doing the same using _jdf
will produce Java objects which is quite weird or at least not so natural if you use type annotations and develop using an IDE like VS Code.
Short term solution
Back to our streaming issue, what would be the solution to run SQL statements against the right Spark session?
The easiest solution is to use global temporary views. The view can be accessed from any Spark session as long as its name is prefixed with global_temp.
in the SQL statement.
The major difference here is on lines 2 and 3 . The view is created as a global temp view on line 2. On line 3, the view is accessed using the root Spark session but it has to be prefixed with global_temp.
. Once that’s done, things work fine without any errors.
Using sql_ctx
is not really optimal but it is way better than _jdf
approach.
Long term solution
The community behind Spark is aware of these limitations in PySpark interface of DataFrame
and how it makes developers life hard while it is not really matching the Scala equivalent API which has an easy interface to Spark session. That’s why there is a long-term solution but it will probably come with next (hopefully minor) Spark version.
There is a JIRA issue named Use SparkSession instead of SQLContext inside PySpark. This issue has a recently merged PR with the following description:
This PR proposes to
SparkSession
within PySpark. This is a base work for respecting runtime configurations, etc. Currently, we rely on old deprecatedSQLContext
internally that doesn't respect Spark session's runtime configurations correctly.This PR also contains related changes (and a bit of refactoring in the code this PR touches) as below:
- Expose
DataFrame.sparkSession
like Scala API does.- Move
SQLContext._conf
->SparkSession._jconf
.- Rename
rdd_array
todf_array
atDataFrame.randomSplit
.- Issue warnings to discourage to use
DataFrame.sql_ctx
andDataFrame(..., sql_ctx)
.
Once this PR is available on whatever Spark installation you use, it would be easy to update the code and remove the dependency on sql_ctx
plus we don’t have to use global temp views anymore. The below will work smoothly (fingers-crossed).
df.createOrReplaceTempView("microBatch")
count = df.sparkSession.sql("select * from microBatch").count()
Update 16/06/2022:
Spark 3.3 has been released on Databricks runtime 11.0 beta. So a PySpark DataFrame has a SparkSession
attribute.
That’s all folks
It might not be a very common use case or something that makes a huge difference in how your write Spark applications. Nonetheless, knowing a bit more about Spark session and how it behaves in different situation is quite useful. It may help in diagnosing a silly problem or even give you an idea to write an innovative solution to a hard task.
Hope that helps! 🙂