Troubleshoot Spark/Pyspark performance issues

Anup Moncy
Data Engineering
Published in
3 min readOct 19, 2023

Steps to help troubleshoot common performance issues in Spark/Pyspark jobs taking EMR/Databricks as example. Of-coarse all these after reviewing there is no change in the data trend or volume.

TL/DR

1. Check the Cluster health and resource allocation (console/UI)

2. Check the stages of the job to identify bottleneck (Resource Manager/UI)

3. Review Partitions, repartition, coalasce

4. Cacheing

5. Logging and Debugging

6. Review in-built optimization Recommendations

  1. Check the Cluster health and resource allocation

Example (EMR):

  • Access the AWS Management Console and navigate to the EMR service.
  • Select your EMR cluster and review the hardware configurations in the “Hardware” tab.
  • Ensure that the instance types and counts meet the requirements, and adjust the cluster configuration accordingly.

Databricks: Access the Databricks UI and navigate to the cluster where your job is running.
Check the “Clusters” tab to ensure that your cluster has sufficient resources allocated. You can modify the cluster configuration if needed.

2. Check the stages of the job to identify bottleneck

Example (Spark on EMR):

  • View the Spark Application UI by accessing the ResourceManager UI (usually at http://<EMR_MASTER_NODE>:8088/cluster).
  • Analyze the stages and tasks to identify bottlenecks.
  • If you are running Spark jobs programmatically, consider adding custom logging statements within your Spark application to log relevant information during execution.

Example Spark UI in Databricks:
Go to the Spark UI by clicking on the “View Spark UI” link in the cluster details. Analyze the stages and tasks to identify bottlenecks.
If you are running your Spark job from a Databricks notebook, use the notebook profiler. It can give you insights into which cells are consuming the most resources and taking the most time.

3. Review Partition needs in the code

Use SHOW PARTITIONS your_table; to see if the partitions are aligned to the data and queries and also review skewness

Repartition where needed: df = df.repartition(num_partitions)

If there are too many small partitions, coalesce them to reduce overhead. df = df.coalesce(new_num_partitions)

Example (Native Spark):

Use the df.rdd.getNumPartitions method to check the current number of partitions in a DataFrame.

// Check number of partitions
val numPartitions = df.rdd.getNumPartitions

If needed, repartition the DataFrame.

// Repartition DataFrame to a specific number
val dfRepartitioned = df.repartition(numPartitions)

Coalesce partitions if there are too many small partitions.

// Coalesce DataFrame to a smaller number of partitions
val dfCoalesced = df.coalesce(newNumPartitions)

4. Data Cache’ing:

Use of df.cache() reduces the need to recompute and unpersisted when they are no longer needed df.unpersist() (to release memory)

5. Logging and Debugging

Use of engineering best practices for logging, and exception handling to pinpoint phase of execution and internal errors causing re-try

Example: Implement custom logging within your Spark application using logging frameworks like log4j or SLF4J.

import org.apache.log4j.Logger

val logger = Logger.getLogger(getClass.getName)

// Log information
logger.info(“Start of my Spark job”)

6. Optimization Recommendations

Most modern platforms like EMR/Databricks provide automated recommendations. Review them periodically

  • EMR and Spark provide optimization recommendations through their respective interfaces. In EMR, you can review Auto Scaling and instance fleet recommendations.
  • Regularly check for updates in Spark and EMR documentation for best practices and optimization tips.

Additional option:

--

--