Performing Delta Table operations in PySpark with Spark Connect

Sebastian Daberdaku
Towards Data Engineering
4 min readJun 7, 2024

Introduced with Spark 3.4, Spark Connect provides a decoupled client-server architecture allowing remote connectivity to Spark clusters using the DataFrame API. It is a very neat feature that allows users to run PySpark or Scala code on a remote Spark Connect Server, which is very useful in those cases when we want to avoid the cold start of a new Spark Session (which can take minutes).

The Spark Connect client is a thin API built on top of Spark’s DataFrame API using unresolved logical plans as a language-agnostic protocol between the client and the Spark driver. The Spark Connect client translates DataFrame operations into unresolved logical query plans which are encoded using protocol buffers, which are sent to the server using the gRPC framework. From there, the standard Spark execution process kicks in, ensuring that Spark Connect leverages all of Spark’s optimizations and enhancements. Results are streamed back to the client through gRPC as Apache Arrow-encoded row batches.

Delta Table API limitations

The Delta Table Python API allows users to interact with delta tables using the DeltaTable.forPath and DeltaTable.forName methods. Even in the current latest version of Delta Lake v4.0.0-preview-rc1 (as of June 7th, 2024), both of these methods access the Spark Context object inside the current Spark Session, which is not supported when using Spark Connect.

This can be an issue when we need to perform Delta Table specific operations like vacuums, optimizations and merges.

Spark SQL to the rescue!

Fortunately, Spark Connect fully supports the Delta Table SQL API.
For instance, we can optimize a Delta Table located at a certain path with the following SQL code run from PySpark.

sql_expr = f"OPTIMIZE delta.`{path}` ZORDER BY (col1, col2, col3)"
spark.sql(sql_expr)

Similarly, we can perform vacuum operations:

sql_expr = f"VACUUM delta.`{path}` RETAIN 168 HOURS"
spark.sql(sql_expr)

What about merge operations?

Performing merge operations can be a little more complicated, but I have had success with the following workaround.

Given a PySpark DataFrame I want to upsert into an existing Delta Table, I first convert it to a TempView, and then perform the merge operation using the SQL API.

The following utility methods can come handy when doing so:

import logging
import random
import string
from contextlib import contextmanager
from typing import Generator

from pyspark.errors import TempTableAlreadyExistsException
from pyspark.sql import DataFrame

logger = logging.getLogger(__name__)


class MultipleFailuresException(Exception):
"""Custom exception raised when an operation has failed after several retries."""
pass


def generate_random_temp_view_name(length: int) -> str:
"""
Generates a random temp view name consisting of lowercase letters of specified length using the
'string.ascii_lowercase' module and the 'random.choice' function.

:param length: Length of the random temp view name to be generated.
:return: Random temp view name as a string.
"""
# Use string.ascii_lowercase to get all lowercase letters
lowercase_letters = string.ascii_lowercase
# Use random.choice to randomly select characters from the set of lowercase letters
random_temp_view_name = ''.join(random.choice(lowercase_letters) for _ in range(length))
return random_temp_view_name


@contextmanager
def create_temp_view(df: DataFrame, max_attempts: int = 5) -> Generator[str, None, None]:
"""
Create a temporary view for a given DataFrame.

:param df: The DataFrame to be used for creating the temporary view.
:param max_attempts: The maximum number of attempts to create the temporary view if it already exists. Default is 5.
:return: The name of the created temporary view.

:raises MultipleFailuresException: If the temporary view creation fails after the maximum number of attempts.
"""
temp_view_name = None
left_attempts = max_attempts
try:
while temp_view_name is None and left_attempts > 0:
try:
temp_view_name = generate_random_temp_view_name(length=12)
df.createTempView(temp_view_name)
logger.info(f"Created temp view named '{temp_view_name}'!")
except TempTableAlreadyExistsException as e:
logger.warning(
msg=f"Temp view named '{temp_view_name}' already exists! Attempts left: {left_attempts}.",
exc_info=e
)
left_attempts -= 1
temp_view_name = None
if temp_view_name is None:
raise MultipleFailuresException(f"Temp view creation failed after {max_attempts} attempts!")
yield temp_view_name
finally:
if temp_view_name is None:
logger.warning("No temp view created so nothing to drop!")
else:
if df.sparkSession.catalog.dropTempView(temp_view_name):
logger.info(f"Dropped temp view named '{temp_view_name}'!")
else:
logger.info(f"Failed to drop temp view named '{temp_view_name}'!")

Now, say to_upsert is the PySpark DataFrame we want to upsert into the DeltaTable located at path. We can use the provided function like so:

with create_temp_view(to_upsert) as temp_view:
merge_condition = f"target.some_column = {temp_view}.some_column"
logger.info(f"Merge condition is: {merge_condition}.")

merge_query = f"""
MERGE INTO delta.`{path}` AS target
USING {temp_view}
ON {merge_condition}
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
"""
logger.info(f"Merge query is: {merge_query}.")
spark.sql(merge_query)

The create_temp_view context manager will automatically create a temp view with a random name to minimize the probability of clashes with other concurrent operations running on the same Spark Session of Spark Connect Server. Moreover, the TempView will be automatically dropped once the merge operation is complete.

--

--