Work With Large Datasets Using Pandas on Spark3.2.0

Sudip Shrestha, PhD
7 min readOct 29, 2021

--

Scalable pandas made possible by newly released spark3.2.0

1. Introduction

The Apache spark community, on October 13, 2021, released spark3.2.0. They included a Pandas API on spark as part of their major update among others. Pandas is a powerful and a well known package among the data scientists. However, Pandas has its own limitation on handling big data because it processes the data on a single machine. To bridge this gap databricks released a library ‘Koalas’ a few years ago.

The addition of Pandas API on spark3.2.0 avoids the need of using third party library. Now, Pandas users can still keep their Pandas and scale out the process to multi-node spark clusters.

2. Purpose

This article specifically covers, how Pandas API on spark can be used to:

  • Read data as pandas-spark dataframe (df)
  • Read data as spark df and convert to pandas-spark df
  • Create pandas-spark df
  • Use SQL query directly to pandas-spark df
  • Use plot function to plot pandas-spark df
  • Transition from koalas to pandas API on spark

3. Data

You can get the CSV file and a Jupyter Notebook used in this article from my GitHub page here. This is a small dataset however, the approaches illustrated here can readily be used in large datasets.

4. Installation Required

Before going further, first download spark3.2.0 from here and set up PySpark properly. You also need pyarrow and plotly libraries, which can be installed through jupyter notebook interface as shown below:

  • pyarrow (!conda install -c conda-forge — yes pyarrow)
  • plotly (!conda install — yes plotly)

Awesome! if your PySpark is up and running, then let’s jump to the next section.

5. Import Libraries and Start Spark Session

Here, we start importing PySpark and spark session using a block of codes as shown below.

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark

The spark info shows that the version used is 3.2.0.

Fig1 : Spark Information

You can also check the version of python and pyspark as below. The spark version I have used is 3.2.0 with python 3.8.8.

print('Version of python: ') 
!python -V
print('Version of pyspark :', pyspark.__version__)
Fig2: Version used

Okay! let’s import read_csv function to read CSV data as pandas-spark df using pyspark.pandas.

If we get warning as shown in Fig3, we can set the environment variable i.e. PYARROW_IGNORE_TIMEZON to 1 before running from pyspark.pandas import read_csv.


from pyspark.pandas import read_csv
Fig3: Warning while importing pyspark.pandas
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark
print('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)
from pyspark.pandas import read_csv
# To get rid of error set the environ variable as below
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"
from pyspark.pandas import read_csv

6.1 Read data as pandas-spark df from csv

We are using ‘example_csv.csv’ file to illustrate the various use cases of pandas spark API. The read_csv function returns the pandas-spark df (call it : psdf).

# Define datapath
# Read as pandas on spark df
datapath = '/Users/...../'
psdf = read_csv(datapath+'example_csv.csv')
psdf.head(2)
Fig4: Pandas on Spark df

Great! we have just created pandas-spark df and now we can use pandas functions to downstream task. For instance, psdf.head(2) , and psdf.shape can be used to get top 2 rows and dimension of the data, respectively. Here, unlike in standard python pandas df, you get the benefit of parallelization.

# get the data type
# get the dimension
# get the data columns' name
print('Data type :', type(psdf))
print('Data shape :', psdf.shape)
print('Data columns : \n ', psdf.columns)
Fig5: Use pandas function on pandas on spark df

Not only this, if you want to convert pandas-spark df to spark df, that is also possible by simply using to_spark() function. This will return the spark dataframe (call it : sdf) and all the pyspark functions can now be used on this df. For instance, sdf.show(5), and sdf.printSchema() outputs top 5 rows and data schema of spark df, respectively.

# Convert from pandas on spark  to spark dataframe# show top 5 rows from spark df
sdf = psdf.to_spark()
sdf.show(5)
Fig6: Showing top 5 records from spark dataframe
# Print schema
sdf.printSchema()
Fig7: Print Schema of spark dataframe

6.2 Read as spark df from csv and convert to pandas-spark df

We can also convert spark df to pandas-spark df using to_pandas_on_spark() command. This takes input as spark df and outputs the pandas-spark df. Below, we read the data as spark df (call it : sdf1). To confirm its a spark df, we can use type(sdf1) which shows that it’s a spark df ie. ‘pyspark.sql.dataframe.DataFrame’.

# Read data using spark 
sdf1 = spark.read.csv(datapath+'example_csv.csv', header=True,inferSchema=True)
type(sdf1)
Fig8. Type showing spark dataframe

Whereas, after converting to pandas-spark df (psdf1), the type is pandas-spark df i.e. ‘pyspark.pandas.frame.DataFrame’. We can further confirm its a pandas-spark df by being able to use pandas function, for example, .head().

# Convert to pandas-spark df
psdf1 = sdf1.to_pandas_on_spark()
# Print top
psdf1.head(2)
Fig9. Showing top 2 rows of pandas on spark df
# Check type of psdf1
type(psdf1)
Fig10. Check type of pandas-spark df

6.3 Create pandas-spark df

In this section, instead of creating pandas-spark df from CSV, we can directly create it by importing pyspark.pandas as ps. Below, we have created psdf2 as pandas-spark df using ps.DataFrame(). The psdf2 has 2 features and 3 rows.

import pandas as pd
import pyspark.pandas as ps
# Create pandas on spark df
psdf2 = ps.DataFrame({'id': [1,2,3], 'score': [89, 97, 79]})
psdf2.head()
Fig11. Created pandas -spark df

If we want to convert the pandas-spark df (psdf2) back to spark df, then we have a readily available function, as explained earlier, to_spark() which does the job. The syntax provides flexibility of interchanging the dataframe types. This may be helpful depending upon the functions (either from pandas or from spark) you want to use in your analysis.

# Again we can convert from pandas-spark df to spark df
sdf2 = psdf2.to_spark()
sdf2.show(2)
Fig12. Spark df converted from pandas-spark df

7. Query pandas on spark df directly using SQL

Another great topic to discuss on pandas-spark API is it’s sql function. Okay, let’s use that function on pandas-spark df (psdf2) created earlier to extract the information from. Actually, we just need to use ps.sql() function on top of pandas-spark df to run SQL query. As shown below, the count(*) function returns total 3 observations in psdf2 data. Similarly, the second query outputs filtered data with score greater than 80.

# Query data using SQL. Input data is pandas on spark df (psdf)
ps.sql("SELECT count(*) as num FROM {psdf2}")
Fig13. Showing results of sql: total 3 records
# Returns pandas on spark df
selected_data = ps.sql("SELECT id, score FROM {psdf2} WHERE score>80")
selected_data.head()
Fig14. Showing results of sql: score>80

8. Plot on pandas df and pandas on spark df

Great! that you have come thus far. Now, let’s briefly touch on plotting capability of this new pandas-spark API. Unlike the default static plot in standard python pandas API, the default plot in pandas-spark API is interactive as it uses plotly by default. Below, we import data both as pandas df and pandas-spark df and plot a histogram of salary variable on each of the data types.

# Read data as pandas dataframe 
pddf = pd.read_csv(datapath+'example_csv.csv')
type(pddf)
#pandas.core.frame.DataFrame
pddf.head(2)
Fig15. Read data as pandas df

Figure below shows the histogram of salary from pandas df.

# Read data as pandas on spark df
pdsdf = read_csv(datapath+'example_csv.csv')
type(pdsdf)
#pyspark.pandas.frame.DataFrame
# plot histogram of pandas df
pddf['salary'].hist(bins=3)
Fig16. Histogram from standard python pandas df

I have shown the histogram of the same variable from pandas-spark df below, which is actually an interactive plot.

Note: The plot below is pasted as an image therefore it is static. You should be able to zoom in/out (make it interactive) if you run below syntax in jupyter notebook.

# plot histogram of pandas on spark df
import plotly
pdsdf['salary'].hist(bins=3)
Fig17. Snapshot of interactive plot from pandas on pyspark df

9. Transition from Koalas to Pandas API

Finally, let’s talk about what are the changes required while transitioning from Koalas library to pandas-spark API. Below Table shows some of the syntax changes from Koalas to new pandas on spark API.

Table1. Transition from Koalas to pandas-spark API

10. Summary

The article covered how we can use newly added pandas API on spark3.2.0 for reading data, creating dataframe, using SQL directly on pandas-spark dataframe, and transitioning from existing Koalas library to pandas-spark API.

Thank you for reading!

Follow me on LinkedIn to get more updates on data science skills.

Happy Learning!!!

11. References

https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html

https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html

https://www.datanami.com/2021/10/26/spark-gets-closer-hooks-to-pandas-sql-with-version-3-2/

https://www.datamechanics.co/blog-post/apache-spark-3-2-release-main-features-whats-new-for-spark-on-kubernetes

https://databricks.com/blog/2020/08/11/interoperability-between-koalas-and-apache-spark.html

--

--

Sudip Shrestha, PhD

Data scientist and quantitative analyst. Enjoy extracting information from the data and sharing ideas on Machine Learning and deep learning.