PySpark Performance Improvement: Part 1

Basil Skaria
3 min readOct 9, 2023

--

Working with huge datasets can be daunting without a framework such as PySpark for Python environment. Performance of Spark applications have higher expectations with the distributed computing in place.

Photo by Mika Baumeister on Unsplash

All developers must have written one SQL query all whilst learning Programming under various paradigms. Observing closely, every SQL query executed specifies how the data is presented finally and not how to fetch and process the same.

Similarly, a developer can write a Spark application, keeping the fundamentals abstracted from the business logic.

We want the Get faster Quick and Easy methods. The following methods are two of many.

  1. Use of DeltaLake Table
from deltatable import DeltaTable
df = DelatTable.ForPath(spark, path_to_deltaTable)
df.show()

DeltaTable is an optimized file system, which provide higher compression and faster readability while IO is performed. It provides a database like features like update and delete while also provision a version control system maintaining all versions of the data. Which can be optimized further to store only the latest instance or version.

All while CSV is simply readable for human when displayed on local machines, DeltaLake tables are stored in a PARQUET format, unreadable to human as it is a compressed text.

Use case would be when historical data is required to be processed with latest data. The source files maybe in a CSV format. Once the CSV is read, the data can be written into a detlatable for retrieval as historical data.

2. Optimum use of Coalesce and Repartition

Repartition: repartition(numberOfPartitions, cols*)

Repartition allows to distribute or redistribute the data records in dataframe into number of partitions based on the parameters mentioned. We can partition based on various column names of the selected dataframe.

df = spark.read.format('csv').option('header',True).load(path_to_source_file)
df.repartition(1).write.mode('overwrite').format('csv').option('header',True).save(path_to_destination)

Reparition works by a shuffle mechanism at the end which can be controlled by mentioning the columns field. The default mechanism also does a shuffle based on most optimum logic.

Repartition has the ability to scale up the number of partitions to any positive number.

Coalesce: coalesce(numPartitions)
df = spark.read.format('csv').option('header',True).load(path_to_source_file)
df.coalesce(1).write.mode('overwrite').format('csv').option('header',True).save(path_to_destination)

Coalesce allows to split or combine partitions of dataframe. Coalesce does not have shuffle mechanism at the end.

What is optimum between the two?

  1. Coalesce when a single partition is needed. When writing as a CSV, we may not need a shuffle mechanism hence Coelesce is suitable.
  2. Coalesce when a reduction in partition count is expected when shuffle is not required.
  3. Repartition when parallel processing using all the executors are needed.
  4. Repartition when number of partition to be increased based beyond 200. Coalesce has a cap on partition count at 200, same as the default number of partitions in spark system.
  5. Repartition when redistribution is needed based on key fields of the dataset.

References:

  1. How to Create Delta Lake tables | Delta Lake
  2. pyspark.pandas.DataFrame.spark.repartition — PySpark 3.5.0 documentation (apache.org)
  3. pyspark.pandas.DataFrame.spark.coalesce — PySpark 3.5.0 documentation (apache.org)

Await the next part for more optimizations.

Thank you for the read. Happy coding.

--

--