Getting Started with PySpark With Examples

shorya sharma
Data Engineering on Cloud
6 min readOct 2, 2021

--

The article is mainly focused on people who have just started in the field of big data and Spark with Python, or even if you are a professional, you may just wanna stick around and brush up on your fundamentals such as setting up the environment for application development using Pyspark and then build end top end pipeline to read data from files, process and write to files.

This article also covers UDFs (User Defined Functions) in Spark.

What is Spark?

Apache Spark is nothing but a unified analytics engine, which is primarily used for big data i.e. large-scale data processing. It is convenient to use for the developers as it provides high-level APIs in Java, Scala, Python, and R. But it does not mean that if you are a SQL developer you won't be able to use Spark, in fact, it also supports higher-level tools including Spark SQL for SQL and structured data processing.

source: https://blog.cambridgespark.com/

What is PySpark?

PySpark is an interface for Apache Spark in Python. It does not just permit you to compose Spark applications utilizing Python APIs, yet in addition, gives the PySpark shell to intuitively dissecting your information in a disseminated environment.

PySpark supports most spark features like spark SQL, Dataframe, Streaming, MLib, Spark Core.

Installation

Let’s see how we can install PySpark in Ubuntu and windows.

Ubuntu

  1. Install Python and Install pip
  2. Install Jupyter Notebook
  3. Install Java using the command sudo apt-get install default-jre
  4. Install Scala using the command sudo apt-get install Scala
  5. Install Py4J, which connects Python with Java and Scala using the command pip3, install Py4J
  6. Install Apache Spark from the browser
sudo tar -zxvf spark-2.1.0-bin-hadoop2.3.tgz
export SPARK_HOME = 'home/ubunutu/spark-2.1.0-bin-hadoop.2.7'
export PATH = $SPARK_HOME:$PATH
export PYTHONPATH = $SPARK_HOME/python:$PYTHONPATH
export PYSPARK_DRIVER_PYTHON = 'jupyter'
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
export PYSPARK_PYTHON=python3

If you want to access PySpark move to spark/python, But if you want to access PySpark from anywhere without changing the directory then:

pip3 install findspark

Windows

  1. Download and Install Anaconda

2. Download and Install Java

3. Download Apache Spark

https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

After downloading Apache spark extract it, and place it in local disk C.

4. Download Winutils

Once you download the winutils code and paste it inside ‘C:\spark-3.1.2-bin-hadoop3.2\bin’

5. SET Environment Variables

In User Variables

In system variables add a new path: C:\spark-3.1.2-bin-hadoop3.2\bin

Install find Spark package: In anaconda prompt run

conda install -c conda-forge findspark

Wohooo!! PySpark is up and running on your laptops.

6. Run random examples

Time to run our random examples, which can be useful for you in the real world.

I am using windows as my operating system.

Open the Jupyter notebook and follow along, you will be able to get the codes from the Github link along with the CSV file used.

Import Libraries, Set Environment and Find Spark

import os
import sys
import glob
os.environ['SPARK_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_201'
os.environ['HADOOP_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'
spark_python = os.path.join(os.environ.get('SPARK_HOME',None),'python')
py4j = glob.glob(os.path.join(spark_python,'lib','py4j-*.zip'))[0]
sys.path[:0]=[spark_python,py4j]
os.environ['PYTHONPATH']=py4j
import findspark
findspark.init()
findspark.find()

Initialize Spark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Basic Examples").getOrCreate()

Read the CSV File into the Data Frame

df = spark.read.csv('F:/main course/Artificial_Neural_Networks/AppleStore.csv',inferSchema=True,header=True)
df.printSchema()
df.show(truncate=False)

Let’s select few columns from the database,

df = df.select(['size_bytes','price','prime_genre','user_rating'])

Because this is my local computer and does not have enough memory, to show the functionalities I will reduce my dataframe to a smaller subset. If you have enough memory or you are on a cluster, feel free to skip the next step

df = df.limit(20)

Cast Operation

from pyspark.sql.functions import col
from pyspark.sql.types import StringType
df_transformed = df.withColumn("price_new",col("price").cast(StringType())

Creating an UDF

Let us create an UDF which pads 0s towards the left.

from pyspark.sql.functions import udf
def leftPad(string):
string = str(string).rjust(10,'0')
return string
convertUDF = udf(lambda z: leftPad(z),StringType())

Creating a UDF to change the Values to Upper Case

convertUDF2 = udf(lambda z: str(z).upper(),StringType())

Using the UDFs

df_transformed = df_transformed. \
withColumn(
"price_new",
convertUDF(col("price_new")).alias("price_new")
)
df_transformed = df_transformed. \
withColumn(
"prime_genre",
convertUDF2(col("prime_genre")).alias("prime_genre")
)

Using Substring and Trim

from pyspark.sql.functions import substring, trim
df_transformed = df_transformed.withColumn("price_new",substring('price_new',6,4))
df_transformed = df_transformed. \
withColumn('prime_genre', trim('prime_genre'))

Lets see what our transformed dataframe looks like

df_transformed.show()

Analysis and Operations

Count Distinct

from pyspark.sql.functions import countDistint
df_distinct_count = df_transformed.select(countDistint("prime_genre").alias("prime_genre_count"))
df_distinct_count.show()

Analyze Data using analytical or windowing functions

Let us go ahead and understand how to get Frequency, cumulative frequency and cumulative percentage using analytical or windowing functions of Pyspark.

from pyspark.sql.functions import count

frequencies = df_transformed. \
groupBy('prime_genre'). \
agg(count('prime_genre').alias('frequency')). \
selectExpr(
'*',
'100*Frequency / sum(Frequency) over() Percent'
). \
selectExpr(
'*',
'sum(Frequency) over(order by Frequency desc) cumulative_frequency',
'sum(Percent) over(order by Frequency desc) cumulative_Percent'
)
frequencies.show()

Dealing with Nulls

Create a Dataframe showing the count of null and not null values from a given column

from pyspark.sql.functions import isnan

null_values = df_transformed.filter(
df_transformed.price.contains('None') | \
df_transformed.price.contains('NULL') | \
(col("price") == '') | \
isnan(df_transformed.price) | \
df_transformed.price.isNull()
).count()
Not_null_values = df_transformed.count() - null_values
data = [(null_values, Not_null_values)]
df_new = spark.createDataFrame(data, ["Null_value", "Not_null_values"])
df_new.show()

Create Dataframes for unique values and null values

Create two Data frames, one will have all the unique values and the other will have all the values which are dropped from the main data frame

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

w2 = Window. \
partitionBy('prime_genre'). \
orderBy(col("user_rating").desc())
df_duplicate = df_transformed. \
withColumn("row", row_number().over(w2)). \
filter(col("row") > 1). \
drop("row")
df_unique = df_transformed. \
withColumn("row", row_number().over(w2)). \
filter(col("row") == 1). \
drop("row")
df_duplicate.show()
df_unique.show()

I bet it was fun to do some hands on over PySpark and I hope the blog was useful for you.

Link for the code and csv:

Also If you are interested In Hive, check out this tutorial of mine:

--

--

shorya sharma
Data Engineering on Cloud

Assistant Manager at Bank Of America | Ex-Data Engineer at IBM | Ex - Software Engineer at Compunnel inc. | Python | Data Science