5 Steps to Converting Python Jobs to PySpark

Moving from Pandas to PySpark using Apache Arrow or Koalas

Mohini Kalamkar
Hashmap, an NTT DATA Company
6 min readOct 16, 2020

--

Spark developers and data scientists often come across tasks to convert Python scripts to PySpark jobs. While working with various clients, I have seen developers mistakingly using a python function as is, without making Spark-relevant changes. This leads to job latency and does not use the power of distributed computing.

In this blog post, I am going to list out the steps I followed while converting a Python script to a PySpark job. I have put together best practices and recommendations to improve Spark job performance. The steps outlined in this blog post will assist with a smoother and more organized transition from pandas to PySpark using Apache Arrow or Koalas.

Let’s Get Started

1. Convert a Pandas DataFrame to a Spark DataFrame (Apache Arrow).

Pandas DataFrames are executed on a driver/single machine. While Spark DataFrames, are distributed across nodes of the Spark cluster.

The easiest way to convert Pandas DataFrames to PySpark is through Apache Arrow. Apache Arrow is a language-independent, in-memory columnar format that can be used to optimize the conversion between Spark and Pandas DataFrames when using toPandas() or createDataFrame(). Arrow usage is not automatic and requires some minor changes to the code/configuration.

To use Arrow, set Spark configuration — spark.sql.execution.arrow.pyspark.enabled to true:

import numpy as np
import pandas as pd
# Enable Arrow-based spark configuration
spark.conf.set(“spark.sql.execution.arrow.enabled”, “true”)
# Generate a pandas DataFrame
data = [1,2,3,4,5]
pdf = pd.DataFrame(data)
# Create a Spark DataFrame from a pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
# Convert the Spark DataFrame back to a pandas DataFrame using Arrow
final_pdf = df.select(“*”).toPandas()

Instead of Apache Arrow, one can facilitate the transition from a single machine to a distributed framework easily using Koalas (Pandas API on Apache Spark).

Koalas is a project that augments PySpark’s DataFrame API to make it more compatible with pandas.

2. Write a PySpark User Defined Function (UDF) for a Python function.

UDF functions take column/s and apply the logic row-wise to produce a new column.

UDF can be defined in two ways:

  • udfname = udf(LAMBDA_EXPRESSION, RETURN_TYPE )
  • udfname = udf(CUSTOM_FUNCTION, RETURN_TYPE)
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
def squared(s):
return s * s
square_udf = udf(lambda x: squared(x), IntegerType())
df = spark.createDataFrame([(4),(8)], [“numbers”])
df.withColumn(“num_square”, square_udf(col(“numbers”))

3. Load a dataset as Spark RDD or DataFrame.

To use the distributed power of Spark, load data set into RDD (Resilient Distributed Datasets) or a DataFrame. Using Pandas to read the input dataset will not use the distributed power of Spark.

#Load dataset as RDD -
path=”Your file path with file name”
text_rdd = sc.textFile(path)
#Load dataset as DataFrame -df=spark.read.format(“csv”).option(“header”,”true”).option(“inferSchema”,”true”).load(path)

4. Avoid for loops

Use the map() transformation wherever possible. Mapping is transformation on each RDD element uses a function that returns a new RDD.

In one of the use cases, a for loop had been used to calculate KPI (key performance indicator) for a list of tags which was delaying the entire process. Because of for loop, KPI was calculated in a sequential way for the tag list.

While rewriting this PySpark job, I used map transformation on an RDD of tags to calculate the KPI. Because of the map transformation, the KPI was calculated in parallel.

5. DataFrame interdependency

Sometimes, DataFrame’s new column value is dependent on other DataFrames. In this case, join DataFrames and then on joined dataset call UDF to get a new column value.

Recommendations for performance tuning of Spark jobs

1. Level of parallelism — If there is too little parallelism, Spark might leave resources idle. If there is too much parallelism, overheads associated with each partition add up and become significant. To tune the level of parallelism

  • Specify the number of partitions when you call operations that shuffle data. e.g. reduce ByKey(func, numofpartition)
  • Redistribute data by using repartition() or coalesce(). To decrease the number of partitions, use coalesce()
  • For a DataFrame, use df.repartition()

2. Number of executors and cores — Based on your data size specify the number of executors and cores. Increasing executors/cores does not always help to achieve good performance.

Each task will be processed by a single-core in a cluster. When you specify 3 cores, one executor will process 3 tasks in parallel.

A maximum of 5 cores is recommended to achieve good performance.

You can use Dynamic Allocation “spark.dynamicAllocation.enabled” which scales the number of executors registered with the application up and down based on the workload.

3. Broadcast variable — Broadcast variables are like the distributed cache in Hadoop.

You should consider using broadcast variables under the following conditions:

  • You have read-only reference data that does not change throughout the life of your Spark application.
  • The data is used across multiple stages of application execution and would benefit from being locally cached on the worker nodes.
  • The data is small enough to fit in memory on your worker nodes.

One of the use cases of the broadcast variable is joining a large data set with a small data set. The broadcasting of a small dataset will help to improve performance.

Broadcasting dataset means data will be available on all executors so that there will be less shuffling of data.

You can increase this parameter “_confg
spark.sql.autoBroadcastJoinThreshold” to the expected size if you want to broadcast a big dataset.

4. Data Serialization — Spark by default has “Java Serialization” which is very flexible and works with most classes, but it is also very slow. Kryo Serialization which uses the Kryo library, is very compact and faster than Java Serialization. However, it does not support all Serializable types.

You can switch to using Kryo by initializing your job with a SparkConf object.

conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)

You can also set serializer in spark-submit command using

— conf spark.serializer= org.apache.spark.serializer.KryoSerializer

5. Cache data — If using RDD/DataFrame more than once in Spark job, it is better to cache/persist it. This will avoid recomputation of RDD/DataFrame which results in faster performance.

6. File format — File format plays an important role to achieve good performance in Spark. Prefer Avro, Parquet file format over text, CSV, and JSON format.

7. Use spark.sql.shuffle.partitions — This configures the number of partitions to use when shuffling data for joins or aggregations. This parameter should be set based on your data size.

Best Practices to tune Spark jobs

  1. Avoid collecting large RDD/DataFrames.
  2. If you have many idle tasks then coalesce().
  3. If not using all cores in the cluster, then repartition().
  4. Avoid using groupByKey. Prefer using reduceByKey.
  5. Avoid shuffling a large amount of data.
  6. Filter data before processing.
  7. Use broadcasting wherever applicable.
  8. Use cache/persist wherever applicable.
  9. Monitor Spark UI to tune your application.
  10. Use dynamic allocation.

Final Thoughts

Using Python as it is to convert Python Jobs to PySpark, is a common mistake. The steps outlined in this blog post can make a smoother and more organized transition from Pandas to PySpark using Apache Arrow or Koalas. After you move to PySpark, apply the “best practices” to tune the Spark job to achieve good performance.

Hopefully, what I’ve shared through my experience gives you some insights into best practices and potential loopholes to watch out for!

Ready to accelerate your digital transformation?

At Hashmap, we work with our clients to build better, together.

If you’d like additional assistance in this area, Hashmap offers a range of enablement workshops and consulting service packages as part of our consulting service offerings, and would be glad to work through your specifics in this area.

Feel free to share on other channels and be sure and keep up with all new content from Hashmap here. To listen in on a casual conversation about all things data engineering and the cloud, check out Hashmap’s podcast Hashmap on Tap as well on Spotify, Apple, Google, and other popular streaming apps.

Other Tools and Content You Might Like

Mohini Kalamkar is a Cloud and Data Engineering Consultant with Hashmap providing Data, Cloud, IoT, and AI/ML solutions and consulting expertise across industries with a group of innovative technologists and domain experts accelerating high-value business outcomes for our customers.

--

--