This basic introduction is to compare common data wrangling methods in Pyspark and pandas data frame with a concrete example. Here, I used US counties’ COVID-19 dataset to show the data wrangling differences between these two types of data frames. The dataset can be downloaded from a Kaggle Dataset This should allow you to get started with data manipulation and analysis under both pandas and spark. Specific objectives are to show you how to:
1. Load data from local files
2. Display the schema of the DataFrame
3. Change data types of the DataFrame
4. Show the head of the DataFrame
5. Select columns from the DataFrame
6. Show the statistics of the DataFrame
7. Drop duplicates
8. Missing Values (check NA, drop NA, replace NA)
9. Datetime manipulations
10. Filter data based on conditions
11. Aggregation functions
12. Sort Data
13. Rename columns
14. Create new columns
15. Join tables
16. User Defined functions
17. Window function
18. Operate SQL queries within DataFrame
19. Convert one type of DataFrame to another
20. Export the data
1. Load Data from Local Files
Before we can manipulate the data, we need to import the following packages:
Unlike pandas, we first need to create a spark session or a spark context before we can import data. Spark Context is used as a channel to access all spark functionality. The spark driver program uses spark context to connect to the cluster through a resource manager (YARN or Mesos). SparkConf is required to create the spark context object, which stores configuration parameters like appName (to identify your spark driver), application, number of core and memory size of executor running on the worker node.
2. Display the Schema of the DataFrame
3. Change Data Types of the DataFrame
When we check the data types above, we found that the cases and deaths need to be converted to numerical values instead of string format in Pyspark. Sometimes, Pyspark will change the original data types. It’s better for us to define the scheme when importing the data. Here is the way how we can change the data types when importing the data in pandas and Pyspark.
Pandas usually don’t change the data types when we import the data. But we can change the data type for saving the memory. Here, I changed the data type of cases and deaths to np.uint32. We can use ‘integer’ as the data type for these two columns, but np.unit 32 can help us to save memory. If your data is too large, you can also change the object data type to category to save memory.
4. Show the Head of the DataFrame
In pandas, we use head() to show the top 5 rows in the DataFrame. While we use show() to display the head of DataFrame in Pyspark. In pyspark, take() and show() are both actions but they are different. Show() prints results, while take() returns a list of rows (in PySpark) and can be used to create a new DataFrame.
5. Select Columns from the DataFrame
# Pandas
df[['state','cases']]# Pyspark
df_s.select('state','cases')
6. Show the Statistics of the DataFrame
Both Pandas and Pyspark to show the statistics for the DataFrame. But Pyspark requires show() to display the results. Describe() in pandas can only show the stats for numerical columns, while it can show the stats for all columns in Pyspark but may contain some missing values.
7. Drop Duplicates
# Pandas
df.state.drop_duplicates()# Pyspark
df_s.select('state').dropDuplicates()
8. Missing Values (Check NA, Drop NA, Replace NA)
8.1 Check NA
# Pandas
# check the number of missing value for each column
df.isnull().sum()
# if we want to check the non-missing value we can use notnull() instead# pyspark
# check the number of missing value for each column
df_s.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_s.columns]).show()
# if we want to check the non-missing value we can use isNotNull()
8.2 Drop NA
Both Pandas and Pyspark use dropna() to drop missing values. We can change the setting of how to ‘any’ or ‘all’ to drop NA.
# pandas
df_valid = df.dropna(subset=['fips','state'], how ='any')# pyspark
df_s_valid = df_s.dropna(how='any', subset =['fips', 'state'])
8.3 Replace NA
# pandas
# Replace Missing values with 0
print(df.fillna(0).isnull().sum())
# Replace Missing values based on specific columns
values = {'fips': -1, 'cases': 0, 'deaths': 0}
df.fillna(value=values, inplace = True)# pyspark
# Replace Missing values with 0
df_s.na.fill(0).select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_s.columns]).show()
# Replace Missing values based on specific columns
df_s.fillna({'fips':'0','cases': 0, 'deaths': 0 })
9. DateTime Manipulations
For Pyspark, you can use functions in pyspark.sql.functions like year, month, etc. Refer to here: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
10. Filter Data Based on Conditions
# Pandas
# Filter data based on the state is CA
df[df.state =='California']
# Only show specific columns based on the filtering conditions
df[df.state =='California'] [['cases','deaths']]# Pyspark
# Filter data based on the state is CA
df_s.where(df_s.state == 'California')
# Alternatively, we can write it in this way
df_s[df_s.state.isin("California")]
# only show specific columns based on the filtering conditions
df_s.select('deaths', 'cases').filter(df_s.state == 'California')
11. Aggregation Functions
Common aggregation functions for both Pandas and Pyspark include: sum(), count(),mean(), min(),max()
It’s hard to compare the aggregation results directly since the Pandas DataFrame and Pyspark DataFrame are in different orders. The following shows how can we sort the DataFrame based on specific columns.
12. Sort Data
In pandas, we use sort_values(), while we use sort() in pyspark to sort the data frame based on specific columns. The default sorting order is ascending.
13. Rename Columns
After the aggregation functions, the names of some columns are confusing. We need to rename these column names to avoid confusion. The following shows how can we rename columns in pandas and Pyspark DataFrame.
# Pandas
df_agg.rename(columns={"deaths": "total_death", "cases": "total_cases"}, inplace = True)# pyspark
df_s_agg = df_s_agg.withColumnRenamed("sum(cases)","total_cases").withColumnRenamed("sum(deaths)", "total_death")
14. Create New Columns
15. Join Tables
In pandas, we can either use merge() or join to join two dataframe. My example shows how to use merge to join two tables. For pyspark, we use join() to join two DataFrame. The default join for both data frame is inner join. We can change it to left join, right join or outer join by changing the parameter in how{‘left’, ‘right’, ‘outer’, ‘inner’}. For more details, you can check in https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.merge.html & http://www.learnbymarketing.com/1100/pyspark-joins-by-example/
16. User Defined Functions
User Defined Functions (aka UDF) is a feature of Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL’s DSL for transforming datasets.
17. Window Function
Window function can be one of any of the standard aggregate functions (sum, count, max, min, avg) as well as a number of functions that can only be used as analytic functions. Window function can help us to write a query with less complexity. It’s mainly composed of four main parts:
a. The Over() clause: This tells the database to expect a window function, rather than a standard aggregate function.
b. The partitionBy() clause: This clause tells the database how to break up the data. In other words, it is similar to a Group By in that it tells the database that row with the same values should be treated as a single group or partition.
c. orderBy() clause: It tells the database how to sort the data within each partition.
d. The rangeBetween() clause: It defines the regions over which the function is calculated.
- unboundedPreceding: from the start of the partition
- unboundedFollowing: to the end of the partition
- 0: current row
The following example shows how can we calculate the cumulative cases for each state. Since we want to get the value for each state, so the data should be partitioned by the state. We want to get the cumulative number, so we first need to sort the data within each state by date in ascending order. Then, we need to define the window frame from the start of partition to the current row. Finally, we could apply the aggregation function ‘sum’ over this window function.
As for pandas, it can only provide us limited window functions, such as rolling mean, rolling sum. For more details, please check https://pandas.pydata.org/pandas-docs/stable/reference/window.html.
18. Operate SQL queries within DataFrame
Spark comes with a SQL library that lets us query DataFrame using the SQL syntax. At first, we have to create a temporary view of the DataFrame using createOrReplaceTempView(), which is a temporary SQL table. This will enable us to use the same SQL syntax as we were using a database like MySQL.
Pandas cannot let us directly write SQL queries within DataFrame, but we still can use query() to write some SQL like syntax to manipulate the data. In pandas, query() is to query the columns of a frame with a boolean expression. For instance:
# Pandas
df.query("state=='California'").count()
19. Convert One Type of DataFrame to Another
19.1 Convert Pandas to Pyspark DataFrame
We can use createDataFrame() to convert a Pandas DataFrame to a Pyspark DataFrame.
df_s_agg_2 = spark.createDataFrame(df_agg)
19.2 Convert Pyspark to Pandas Dataframe
It is also possible to use Pandas DataFrames when using Spark, by calling toPandas() on a Spark DataFrame, which returns a pandas object. However, this function should generally be avoided except when working with small DataFrames, because it pulls the entire object into memory on a single node.
df_s_agg.toPandas()
20. Export the Data
You must stop() the active SparkContext before creating a new one. When we are done with our data manipulation, we need to run sc.stop() to end the active SparkContext.