The Ultimate Guide to Pyspark Optimisation Series — Part I

Dheeraj Thasma Ravindranath
SquareShift
Published in
3 min readMay 16, 2022

Apache Spark is one of the most popular open source distributed cluster computing framework that is used for quick processing, querying and analysing Big Data. In this Series, we are going to learn some optimisation techniques to improve efficiency of Spark Jobs.

Have you ever wondered, using too many withColumn affects performance of the job?

WithColumn ???

Sadly, the answer is Yes!!!

It is common to use withColumn function when it comes to transformation of DataFrame like adding or modifying the columns.

DataFrames are immutable in nature. So every operation on DataFrame results in a new Spark DataFrame.

Let’s see how the withColumn is impacting the performance. First, we are going to load the data. In our case, we are going to load the data from S3. Once the data is loaded, run a loop to add columns using withColumn.

from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
import boto3
import json #import the json module
import logging
import sys
sparkContext = SparkContext()
glueContext = GlueContext(sparkContext.getOrCreate())
sparkSession = glueContext.spark_session
logger = glueContext.get_logger()
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job.init(args['JOB_NAME'], args)
ds = glueContext.create_dynamic_frame_from_options(
connection_type = "s3",
connection_options = { "paths": ["s3://FileLocation"] },
format = "csv",
format_options = {
"withHeader": True
}
)
df = ds.toDF()
for i in range(10):
df = df.withColumn("test_{0}".format(i), lit("None"))
df.show()
job.commit()

We can clearly see the drastic increase in the driver memory. Now, how to overcome this?

To Overcome this, use select instead of using withColumn.

from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
import boto3
import json #import the json module
import logging
import sys
sparkContext = SparkContext()
glueContext = GlueContext(sparkContext.getOrCreate())
sparkSession = glueContext.spark_session
logger = glueContext.get_logger()
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job.init(args['JOB_NAME'], args)
ds = glueContext.create_dynamic_frame_from_options(
connection_type = "s3",
connection_options = { "paths": ["s3://FileLocation"] },
format = "csv",
format_options = {
"withHeader": True
}
)
df = ds.toDF()
df = df.select("*", lit("None").alias("test_0"), lit("None").alias("test_1"), lit("None").alias("test_2"), lit("None").alias("test_3"), lit("None").alias("test_4"), lit("None").alias("test_5"), lit("None").alias("test_6"), lit("None").alias("test_7"), lit("None").alias("test_8"), lit("None").alias("test_9"))
df.show()
job.commit()

If you see the analyzed logical plan contains more projections in first case (using withColumn) than in second case (using select). When we work with large datasets, this memory footprint from the additional projections can become problematic and time consuming. It is also mentioned in the spark docs that multiple withColumn calls generate internal projections and can lead to performance issues https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/dataframe.html#DataFrame.withColumn

Let’s catch up in next part of Optimisation Series. Thank you for support and don’t forget to 👏👏

--

--