PySpark DataFrame 101: Getting started with PySpark DataFrames
PySpark Introduction
PySpark is a Python library that provides a Python API for Apache Spark and enables us to perform data processing and analysis at scale. PySpark DataFrames are a distributed collection of data organised into named columns, similar to a table in a relational database.
PySpark DataFrame
PySpark DataFrames are designed to handle large datasets that require distributed processing across multiple machines. They are optimised for scalability and can process data in parallel across a cluster of machines, making them well-suited for big data processing. PySpark also provides a range of built-in functions and APIs that enable us to perform complex data analysis and machine learning tasks on large datasets. This gives PySpark an advantage over Pandas when we are dealing with large datasets.
Lazy Evaluation in PySpark DataFrame
PySpark DataFrames use lazy evaluation, which means that data is not processed until an action is triggered.
Lazy evaluation is a technique where a program waits to do something until it absolutely has to. In PySpark, this means that operations on a DataFrame are not done right away. Instead, they are only done when they have to be. This can save time and resources because it allows Spark to optimise the way it performs its operations.
For example, imagine we have a DatFrame with a million rows of data. If we want to filter that data, Spark won’t do it right away. Instead, it will wait until it needs to.
PySpark DataFrame in-action
In this section, we will install, set up and learn about some PySpark DataFrame operations. We will be using Google Colab.
PySpark installation and set up
To install PySpark, we need to run the following command —
!pip install pyspark
We need to create a spark session to interact with Apache Spark. To create, we need to run the following commands —
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Spark101').getOrCreate()
spark
Reading CSV and understanding data
Reading a CSV file is similar to that in Pandas.
We will use the CSV file from this Kaggle dataset. To read a CSV, we need the following command —
df_pyspark = spark.read.csv('Salary_Dataset_with_Extra_Features.csv', header=True)
df_pyspark.show(5)
Dataset count
print('No. of rows in our dataset:', df_pyspark.count())
Describing dataset
df_pyspark.describe().show()
Dataset Schema
df_pyspark.printSchema()
As we can see, all the columns are of string
type. Although PySpark implicitly type casts the columns during comparison or other operations, it is always better to explicitly cast the column to be safe and certain about it.
Changing the column type
from pyspark.sql.types import IntegerType, FloatType
df_pyspark = df_pyspark.withColumn("Rating", df_pyspark["Rating"].cast(FloatType()))
df_pyspark = df_pyspark.withColumn("Salary", df_pyspark["Salary"].cast(IntegerType()))
df_pyspark = df_pyspark.withColumn("Salaries Reported", df_pyspark["Salaries Reported"].cast(IntegerType()))
df_pyspark.printSchema()
Selecting columns
We can select a single column to view —
df_pyspark.select('Salary').show(5)
We can also select multiple columns —
df_pyspark.select(['Rating', 'Company Name', 'Job Title', 'Salary']).show(5)
To get the distinct values of a column —
df_pyspark.select('Employment Status').distinct().show()
Filtering columns
We can filter the DataFrame based on a single column —
df_pyspark.filter(df_pyspark['Salary'] > 1000000).show(5)
Or multiple columns —
df_pyspark.filter((df_pyspark['Salary'] > 1000000) & (df_pyspark['Job Roles'] != 'Android')).show(5)
isin
filter can be applied on a column to filter based on a list of strings —
df_pyspark.filter(df_pyspark['Company Name'].isin('Unacademy', 'Amazon')).show(5)
Adding, deleting and renaming columns
We can add a custom column which helps us in feature engineering —
df_pyspark = df_pyspark.withColumn('Rating>=4', df_pyspark['Rating']>=4.0)
df_pyspark.show(5)
Generally, as a best practice column names should not contain special characters except underscore (_).
Hence we need to rename our column to remove the special characters —
df_pyspark = df_pyspark.withColumnRenamed('Rating>=4', 'RatingGrtThn4')
df_pyspark.show(5)
Let’s delete the column using drop
function —
df_pyspark = df_pyspark.drop('RatingGrtThn4')
df_pyspark.show(5)
Sorting columns
Sorting based on a single column —
df_pyspark.sort('Salary', ascending = False).show(5)
Sorting based on multiple columns, with different orders —
df_pyspark.sort(df_pyspark['Salary'].desc(),df_pyspark['Rating'].asc()).show(5)
GroupBy and Aggregate functions
Grouping by a single column —
df_pyspark.groupBy('Location').max('Salary').show(5)
Grouping and sorting by a single column —
df_pyspark.groupBy('Location').max('Salary').sort('max(Salary)', ascending = False).show(5)
Grouping by multiple columns —
df_pyspark.groupBy('Location', 'Employment Status').max('Salary').show(5)
Grouping by a single column and taking averages of multiple columns—
df_pyspark.groupBy('Location').avg().select(['Location', 'avg(Rating)', 'avg(Salary)']).show(5)
Grouping by a single column and aggregating on multiple columns —
df_pyspark.groupBy('Location').agg({"Rating": "avg", "Salary": "max"}).show(5)
Combining multiple operations
We can also combine the operations mentioned above as per our requirements. One example is given below where I have grouped by, then took max, followed by filtering and finally sorting the result
df_pyspark.groupBy('Location', 'Employment Status').max('Salary').filter(df_pyspark['Location'] == 'New Delhi').sort('max(Salary)', ascending = False).show(5)
Advanced operations
There are also some advanced operations in PySpark —
- Window: allows to perform complex aggregations and transformations over groups of rows.
- Explode: transforms a single column with arrays or maps into multiple rows.
- Pivot: allows for rotation of rows into columns.
- User-Defined Functions (UDFs): enables users to define their functions and use them in PySpark.
- ML module: provides machine learning algorithms and tools for building and evaluating models.
- Approximate functions: such as approx_mean, enable approximate calculations over large datasets for faster processing.
As this article is intended for those just starting with PySpark DataFrames, we won’t cover these operations in detail.
Conclusion
PySpark is a Python library that provides a user-friendly interface to Apache Spark for big data processing and analysis. PySpark DataFrames are distributed data collections optimized for scalability and parallel processing. They use lazy evaluation, delaying processing until necessary to optimize performance. PySpark provides various DataFrame operations, such as CSV reading, column selection, filtering, sorting, grouping, aggregation and some advanced operations. Overall, PySpark is a powerful tool for handling large-scale datasets.
[References]
If you like it, please leave a 👏.
Feedback/suggestions are always welcome.