Work With Large Datasets Using Pandas on Spark3.2.0
Scalable pandas made possible by newly released spark3.2.0
1. Introduction
The Apache spark community, on October 13, 2021, released spark3.2.0. They included a Pandas API on spark as part of their major update among others. Pandas is a powerful and a well known package among the data scientists. However, Pandas has its own limitation on handling big data because it processes the data on a single machine. To bridge this gap databricks released a library ‘Koalas’ a few years ago.
The addition of Pandas API on spark3.2.0 avoids the need of using third party library. Now, Pandas users can still keep their Pandas and scale out the process to multi-node spark clusters.
2. Purpose
This article specifically covers, how Pandas API on spark can be used to:
- Read data as pandas-spark dataframe (df)
- Read data as spark df and convert to pandas-spark df
- Create pandas-spark df
- Use SQL query directly to pandas-spark df
- Use plot function to plot pandas-spark df
- Transition from koalas to pandas API on spark
3. Data
You can get the CSV file and a Jupyter Notebook used in this article from my GitHub page here. This is a small dataset however, the approaches illustrated here can readily be used in large datasets.
4. Installation Required
Before going further, first download spark3.2.0 from here and set up PySpark properly. You also need pyarrow and plotly libraries, which can be installed through jupyter notebook interface as shown below:
- pyarrow (!conda install -c conda-forge — yes pyarrow)
- plotly (!conda install — yes plotly)
Awesome! if your PySpark is up and running, then let’s jump to the next section.
5. Import Libraries and Start Spark Session
Here, we start importing PySpark and spark session using a block of codes as shown below.
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark
The spark info shows that the version used is 3.2.0.
You can also check the version of python and pyspark as below. The spark version I have used is 3.2.0 with python 3.8.8.
print('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)
Okay! let’s import read_csv function to read CSV data as pandas-spark df using pyspark.pandas.
If we get warning as shown in Fig3, we can set the environment variable i.e. PYARROW_IGNORE_TIMEZON to 1 before running from pyspark.pandas import read_csv.
from pyspark.pandas import read_csv
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
sparkprint('Version of python: ')
!python -V
print('Version of pyspark :', pyspark.__version__)from pyspark.pandas import read_csv
# To get rid of error set the environ variable as below
os.environ["PYARROW_IGNORE_TIMEZONE"]="1"from pyspark.pandas import read_csv
6.1 Read data as pandas-spark df from csv
We are using ‘example_csv.csv’ file to illustrate the various use cases of pandas spark API. The read_csv function returns the pandas-spark df (call it : psdf).
# Define datapath
# Read as pandas on spark df
datapath = '/Users/...../'
psdf = read_csv(datapath+'example_csv.csv')
psdf.head(2)
Great! we have just created pandas-spark df and now we can use pandas functions to downstream task. For instance, psdf.head(2) , and psdf.shape can be used to get top 2 rows and dimension of the data, respectively. Here, unlike in standard python pandas df, you get the benefit of parallelization.
# get the data type
# get the dimension
# get the data columns' nameprint('Data type :', type(psdf))
print('Data shape :', psdf.shape)
print('Data columns : \n ', psdf.columns)
Not only this, if you want to convert pandas-spark df to spark df, that is also possible by simply using to_spark() function. This will return the spark dataframe (call it : sdf) and all the pyspark functions can now be used on this df. For instance, sdf.show(5), and sdf.printSchema() outputs top 5 rows and data schema of spark df, respectively.
# Convert from pandas on spark to spark dataframe# show top 5 rows from spark df
sdf = psdf.to_spark()
sdf.show(5)
# Print schema
sdf.printSchema()
6.2 Read as spark df from csv and convert to pandas-spark df
We can also convert spark df to pandas-spark df using to_pandas_on_spark() command. This takes input as spark df and outputs the pandas-spark df. Below, we read the data as spark df (call it : sdf1). To confirm its a spark df, we can use type(sdf1) which shows that it’s a spark df ie. ‘pyspark.sql.dataframe.DataFrame’.
# Read data using spark
sdf1 = spark.read.csv(datapath+'example_csv.csv', header=True,inferSchema=True)
type(sdf1)
Whereas, after converting to pandas-spark df (psdf1), the type is pandas-spark df i.e. ‘pyspark.pandas.frame.DataFrame’. We can further confirm its a pandas-spark df by being able to use pandas function, for example, .head().
# Convert to pandas-spark df
psdf1 = sdf1.to_pandas_on_spark()
# Print top
psdf1.head(2)
# Check type of psdf1
type(psdf1)
6.3 Create pandas-spark df
In this section, instead of creating pandas-spark df from CSV, we can directly create it by importing pyspark.pandas as ps. Below, we have created psdf2 as pandas-spark df using ps.DataFrame(). The psdf2 has 2 features and 3 rows.
import pandas as pd
import pyspark.pandas as ps
# Create pandas on spark df
psdf2 = ps.DataFrame({'id': [1,2,3], 'score': [89, 97, 79]})
psdf2.head()
If we want to convert the pandas-spark df (psdf2) back to spark df, then we have a readily available function, as explained earlier, to_spark() which does the job. The syntax provides flexibility of interchanging the dataframe types. This may be helpful depending upon the functions (either from pandas or from spark) you want to use in your analysis.
# Again we can convert from pandas-spark df to spark df
sdf2 = psdf2.to_spark()
sdf2.show(2)
7. Query pandas on spark df directly using SQL
Another great topic to discuss on pandas-spark API is it’s sql function. Okay, let’s use that function on pandas-spark df (psdf2) created earlier to extract the information from. Actually, we just need to use ps.sql() function on top of pandas-spark df to run SQL query. As shown below, the count(*) function returns total 3 observations in psdf2 data. Similarly, the second query outputs filtered data with score greater than 80.
# Query data using SQL. Input data is pandas on spark df (psdf)
ps.sql("SELECT count(*) as num FROM {psdf2}")
# Returns pandas on spark df
selected_data = ps.sql("SELECT id, score FROM {psdf2} WHERE score>80")
selected_data.head()
8. Plot on pandas df and pandas on spark df
Great! that you have come thus far. Now, let’s briefly touch on plotting capability of this new pandas-spark API. Unlike the default static plot in standard python pandas API, the default plot in pandas-spark API is interactive as it uses plotly by default. Below, we import data both as pandas df and pandas-spark df and plot a histogram of salary variable on each of the data types.
# Read data as pandas dataframe
pddf = pd.read_csv(datapath+'example_csv.csv')type(pddf)
#pandas.core.frame.DataFramepddf.head(2)
Figure below shows the histogram of salary from pandas df.
# Read data as pandas on spark df
pdsdf = read_csv(datapath+'example_csv.csv')type(pdsdf)
#pyspark.pandas.frame.DataFrame# plot histogram of pandas df
pddf['salary'].hist(bins=3)
I have shown the histogram of the same variable from pandas-spark df below, which is actually an interactive plot.
Note: The plot below is pasted as an image therefore it is static. You should be able to zoom in/out (make it interactive) if you run below syntax in jupyter notebook.
# plot histogram of pandas on spark df
import plotly
pdsdf['salary'].hist(bins=3)
9. Transition from Koalas to Pandas API
Finally, let’s talk about what are the changes required while transitioning from Koalas library to pandas-spark API. Below Table shows some of the syntax changes from Koalas to new pandas on spark API.
10. Summary
The article covered how we can use newly added pandas API on spark3.2.0 for reading data, creating dataframe, using SQL directly on pandas-spark dataframe, and transitioning from existing Koalas library to pandas-spark API.
Thank you for reading!
Follow me on LinkedIn to get more updates on data science skills.
Happy Learning!!!
11. References
https://spark.apache.org/docs/latest/api/python/migration_guide/koalas_to_pyspark.html
https://databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html
https://www.datanami.com/2021/10/26/spark-gets-closer-hooks-to-pandas-sql-with-version-3-2/
https://databricks.com/blog/2020/08/11/interoperability-between-koalas-and-apache-spark.html