INTRODUCTION TO PYSPARK

Shilpa Grace
NeST Digital
Published in
9 min readAug 6, 2023

The Apache Spark framework, which is quick and effective, offers an API for carrying out massively distributed processing over resilient sets of data. Additionally, it guarantees super-fast data processing and supports many languages, including Scala, Python, Java, and R.

The Python API for using Apache Spark, a parallel and distributed engine used for big data analytics, is called PySpark. In the age of big data, Python users frequently use PySpark to build applications using distributed clusters and perform data analytics on enormous datasets.

Pyspark

FEATURES

· In-memory computation

· Distributed processing using parallel processing architecture

· Can be used with many cluster managers

· Fault-tolerant

· Immutable

· Lazy evaluation

· Cache & persistence

· Inbuild-optimization when using Data Frames

· Supports ANSI SQL

PYSPARK ARCHITECTURE

Apache Spark employs a master-slave architecture, with the master referred to as “Driver” and the slaves referred to as “Workers.” When you run a Spark application, Spark Driver creates a context that serves as an entry point for your application, and all operations (transformations and actions) are performed on worker nodes, with Cluster Manager managing the resources.

Spark Architecture

Types of Cluster Manager

Below are the cluster managers that Spark supports:

· Standalone is a straightforward cluster manager that comes with Spark and makes cluster setup simple.

· A cluster manager that can also run Hadoop MapReduce and PySpark applications is called Apache Mesos — Mesons.

· The resource manager in Hadoop 2 is called Hadoop YARN. Most people use this cluster manager in Hadoop based installation.

· An open-source platform called Kubernetes automates the deployment, scaling, and administration of containerized applications.

Installation of Spark in Ubuntu

Prerequisites

  • An Ubuntu system. (I am using Ubuntu 20.04.6 LTS)
  • Access to a terminal or command line.
  • A user with sudo or root permissions.

In the terminal, enter sudo apt-get update in the command line, enter in your admin password, and press the Enter key. It helps to downloads and installs the updates for each outdated package and dependency on your system.

Install the required dependencies prior to downloading and installing Spark. The following packages must be installed as part of this step: Scala JDK and Git.

Run the following command in a terminal window to install all three packages simultaneously:

$ sudo apt install default-jdk scala git –y

Once the process completes, verify the installed dependencies by running these commands:

 $ java -version; javac -version; scala -version; git –version
java ang git version

Use the wget command and the direct link to download the Spark archive:

$ wget https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop2.7.tgz

Now, extract the saved archive using tar:

$ tar xvf spark-3.2.4-bin-hadoop2.7.tgzs

Finally, move the unpacked directory to the opt/spark directory. Use the mv command to do so:

$ sudo mv spark-3.2.4-bin-hadoop2.7 /opt/spark

Before starting a master server, you need to configure environment variables. There are a few Spark home paths you need to add to the user profile. Use the echo command to add these three lines to .profile :

$ echo "export SPARK_HOME=/opt/spark" >> ~/.profile
$ echo "export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin" >> ~/.profile
$ echo "export PYSPARK_PYTHON=/usr/bin/python3" >> ~/.profile

You can also add the export paths by editing the .profile file in the editor of your choice, such as nano or vim. For example, to use nano, enter:

$ nano .profile

Load the shell by entering:

$ spark-shell
spark shell

To use pyspark run the below command.

$ pyspark
pyspark shell

Start Standalone Spark Master Server

Now that you have completed configuring your environment for Spark, you can start a master server. In the terminal, type:

$ start-master.sh

To view the Spark Web user interface, open a web browser and enter the localhost IP address on port 8080. http://127.0.0.1:8080/

Start Spark Slave Server

In this single-server, standalone setup, we will start one slave server along with the master server. To do so, run the following command in this format:

$ start-slave.sh spark://master:port

PySpark DataFrame

  1. Creating a Spark Session

The first step would be to install and load Pyspark and Pandas libraries that we will need to perform data loading.

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

If you want to create a dataframe with PySpark, you must first create a SparkSession, which is the entry point into all of Spark’s functionality. Start the code listed below to initialize a SparkSession:

spark = SparkSession.builder.appName("Sample"). getOrCreate()

2. Creating the DataFrame

To load a dataset into Spark session, we can use the spark.read.csv( ) method , where we need to supply the header = True if the column contains any name. inferSchema = True argument helps to infers the actual data type while reading data

df = spark.read.csv("/content/cruise_dataset.csv", inferSchema=True, header=True)

3. Operations on Dataframe

If we print the df object, then it will print the data column names and data types.

print(df)

DataFrame[Ship_name: string, Cruise_line: string, Age: int, Tonnage: double, passengers: double, length: double, cabins: double, passenger_density: double, crew: double]

The show() method of PySpark must be used to print a data frame, and the number of rows to print must be passed as an integer value.

df.show(5)

The data types can be printed using PySpark’s built-in printSchema() method.

df.printSchema()

We can print column names using the .column attribute.

df_pyspark.columns

['Ship_name', 'Cruise_line', 'Age', 'Tonnage', 'passengers', 'length', 'cabins', 'passenger_density', 'crew']

Selecting a specific column we can use select( ) method

df.select("Ship_name").show(5)

PySpark also supports describe( ) method which provides count, mean, standard deviation, min, and max.

df.describe().show()

Use the withcolumn() method, which accepts the name of the new column as the first parameter and the computation as the second, to create a new column.

df = df.withColumn("TotalMembers",df["passengers"]+df["crew"])
df.show(5)

To rename a column, we need to use the withColumnRenamed( ) method and pass the old column as first argument and new column name as second argument.

df.withColumnRenamed("passenger_density","density").show(5)

PySpark’s .filter( ) method makes is very easy to filter data frames.

df.filter("Age>=30").show(5)

We can perform a selection operation to separate out specific columns.

df.filter("Age>=30").select(["Ship_name","Cruise_line","passengers"]).show(5)

The Groupby functionality in PySpark works similar to Pandas. We can also apply .count( ) method to count number of observations for each day label/categories.

df.groupBy("Cruise_line").count().show()

PYSPARK JOIN

When two DataFrames are combined, you can join additional DataFrames by chaining these together. PySpark Join supports all of the fundamental join operations found in conventional SQL, including INNER, OUTER, LEFT OUTER, RIGHT OUTER.

First, let’s create an "emp" and "dept" dataframes. Here, column "emp_id" is unique on emp and "dept_id" is unique on the dept dataset’s and "emp_dept_id” from "emp" has a reference to "dept_id" on "dept" dataset.

emp = [(1,"Suma",-1,"2018","10","M",3000), \
(2,"Smith",1,"2010","20","M",4000), \
(3,"Vibin",1,"2010","10","M",1000), \
(4,"Jhonny",2,"2005","10","F",2000), \
(5,"Browny",2,"2010","40","",-1), \
(6,"Browny",2,"2010","50","",-1) \
]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
"emp_dept_id","gender","salary"]
empdf = spark.createDataFrame(data=emp, schema = empColumns)
empdf.printSchema()
empdf.show(truncate=False)
dept = [("Finance",10), \
("Marketing",20), \
("Sales",30), \
("IT",40) \
]
deptColumns = ["dept_name","dept_id"]
deptdf = spark.createDataFrame(data=dept, schema = deptColumns)
deptdf.printSchema()
deptdf.show(truncate=False)

Inner join is PySpark’s default and most frequently used join. With this, two datasets (emp & dept) are joined based on key columns, and rows where the keys do not match are removed.

empdf.join(deptdf,empdf.emp_dept_id ==  deptdf.dept_id,"inner") \
.show(truncate=False)

It drops “emp_dept_id” 50 from “emp” and “dept_id” 30 from “dept” datasets.

Outer, Full and Fullouter join returns all rows from both datasets, where join expression doesn’t match it returns null on respective record columns.

empdf.join(deptdf,empdf.emp_dept_id ==  deptdf.dept_id,"outer") \
.show(truncate=False)
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"full") \
.show(truncate=False)
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"fullouter") \
.show(truncate=False)

Below is the result of the above Join expression.

From “emp” dataset’s “emp_dept_id” with value 50 doesn’t have a record on “dept” and “dept_id” 30 doesn’t have a record in “emp”. So rest of columns are null for that particular 2 row.

When a join expression doesn’t match, it assigns null for that record and removes records from the right where a match wasn’t found. Left and Leftouter join returns all rows from the left dataset regardless of any matches found on the right dataset.

empdf.join(deptdf,empdf.emp_dept_id ==  deptdf.dept_id,"left").show(truncate=False)
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"leftouter").show(truncate=False)

From our dataset, “emp_dept_id” 50 doesn’t have a record on “dept” dataset and “dept_id” 30 from “dept” dataset dropped from the results.

The reverse of a left join is a right outer join, which returns all rows from the right dataset regardless of any math that was found on the left dataset. When the join expression doesn’t match, it assigns null to the record in question and removes any records from the left where the match wasn’t found.

empdf.join(deptdf,empdf.emp_dept_id ==  deptdf.dept_id,"right") \
.show(truncate=False)
empdf.join(deptdf,empdf.emp_dept_id == deptdf.dept_id,"rightouter") \
.show(truncate=False)

The right dataset “dept_id” 30 doesn’t have it on the left dataset “emp” hence, this record contains null on “emp” columns and “emp_dept_id” 50 dropped as a match not found on left.

BROADCAST JOIN

Large amounts of data are shuffled across partitions in the same cluster and even between partitions of different executors when we perform a join between two large datasets, as is the case in the backend.

When we need to join a smaller dataset with a larger dataset, we employ broadcast joins. Spark broadcasts the smaller dataset to all cluster nodes when we utilise a broadcast join because the data to be joined is present on all cluster nodes and spark can do a join without any shuffling. You can avoid sending massive amounts of data across the network and shuffling by using this broadcast join. We may verify whether the data frame is broadcast or not using the explain technique. The below example illustrated how broadcast join is done.

From the employee department example, we can perform broadcast

from pyspark.sql.functions import broadcast
empdf.join(broadcast(deptdf),empdf["emp_dept_id"] == deptdf["dept_id"]).show()

Let’s use the explain() method to analyze the physical plan of the broadcast join.

empdf.join(broadcast(deptdf),empdf["emp_dept_id"] == deptdf["dept_id"]).explain()

PySpark is a great language to learn because it enables scalable analysis and ML pipelines. If you’re already familiar with Python and Pandas, then much of your knowledge can be applied to Spark. I have shown how to install spark on Ubuntu and to perform some common operations on dataframes with PySpark.

Reference

1. Getting Started — PySpark 3.4.0 documentation (apache.org)

2. PySpark Tutorial For Beginners | Python Examples — Spark By {Examples} (sparkbyexamples.com)

--

--