Data Preprocessing Using Pyspark (Part:1)

Vishal Barad
Vedity
Published in
13 min readDec 23, 2021

Apache Spark is a framework that allows for quick data processing on large amounts of data.

Spark⚡

Data preprocessing is a necessary step in machine learning as the quality of the data affects the result and performance of the machine learning model which we applied on data. Therefore, it is extremely important that we preprocess our data before feeding it into our model.

Data preprocessing task is carried out by pandas and Pyspark as well, Here we are going to explore Pyspark.

Before we dive deep in to the preprocessing part let’s first take a brief knowledge about data exploration,

Data exploration

Data exploration, also known as exploratory data analysis (EDA), is a process for exploring, visualizing data to find pattern or uncover insight from the start and helps in identifying problems in the DataFrame as well as deciding which model or algorithm to use in subsequent steps.

Let’s quickly discuss some of the data exploration technique

  • Statistical summary of data

Pyspark’s describe() is used to view some basic statistical details like count, mean, stddev etc. of a DataFrame or a series of numeric …

data = [[1,None,"vignan",95.0],
[None,"ojaswi","vvit",55.0],
[3,None,"vvit",65.0],
[None,"sridevi","vignan",None], [None,"Alrich","nonvignan",56.0],
[1,None,None,55.0],
[5,"gnanesh","iit",1.0],
[1,None,"vignan",95.0],
[6,None,"vignan",22.0]]

columns = ['student_ID','student_name', 'college','marks']
dataframe = spark.createDataFrame(data,columns)
dataframe.describe().show()
O/P:
+-------+------------------+------------+-------+------------------+ |summary| student_ID|student_name|college| marks| +-------+------------------+------------+-------+------------------+ | count| 6| 4| 8| 8| | mean|2.8333333333333335| null| null| 55.5| | stddev| 2.228601953392904| null| null|32.302144997330615| | min| 1| Alrich| iit| 1.0| | max| 6| sridevi| vvit| 95.0| +-------+------------------+------------+-------+------------------+
  • Count total number of null values

Let’s find and visualize the total number of null value present in DataFrame’s column,

import matplotlib.pyplot as pltnull_value_list = list()
for col_ in dataframe.columns:
null_value_list.append(dataframe.filter(dataframe[col_].
isNull().count())
plt.rcParams["figure.figsize"] = (40,9)
columns = [col_ for col_ in dataframe.columns]
myexplode = [0.2, 0, 0, 0]
plt.pie(null_value_list, labels = columns, explode = myexplode, shadow = True, autopct='%1.0f%%')
plt.title('Total number of null value in column')
plt.show()O/P:

As we can saw “student_ID” column has 3 null values, same as for “student_name” has 5 and so on.

  • Count duplicate row

Let’s find how many duplicate row present in DataFrame,

import pyspark.sql.functions as funcsdataframe.groupBy(dataframe.columns).count().where(funcs.col('count') > 1).select(funcs.sum('count')).show()O/P:
+----------+
|sum(count)|
+----------+
| 2|
+----------+

as we can show there are two duplicate rows in DataFrame as shown below.

Duplicate row
  • Find numeric and categorical column

Let’s find numeric (!=string) and categorical (==string) columns in DataFrame,

numeric_columns = list()
categorical_column = list()
for col_ in dataframe.columns:
if dataframe.select(col_).dtypes[0][1] != "string":
numeric_columns.append(col_)
else:
categorical_column.append(col_)

print("Numeric columns",numeric_columns)
print("categorical columns",categorical_column)
O/P:
Numeric columns ['student_ID', 'marks']
categorical columns ['student_name', 'college']

Now you have better clarity about Data exploration now we are going to explore Data preprocessing,

So here we go🚩,

Data preprocessing’s techniques we are going to discuss

  1. Drop null value
  2. Handle missing value with imputation
  3. Outlier detection, removal and imputation
  4. Drop feature/column
  5. Drop duplicates
  6. Convert categorical feature/column into numerical
  7. Feature scaling
  8. Train test split data

Now, here we’ll discuss first three techniques in deep with implementation. Rest of others we’ll discuss in second part of this blog.

Drop null value

Let’s first create Pyspark DataFrame. After creating data frame we’ll perform this operation.

# Create pyspark dataframe
data = [[1,None,"vignan",95.0],
[None,"ojaswi","vvit",55.0],
[3,None,"vvit",65.0],
[None,"sridevi","vignan",None], [None,"Alrich","nonvignan",56.0],
[1,None,None,55.0],
[5,"gnanesh","iit",1.0],
[1,None,"vignan",95.0],
[6,None,"vignan",22.0]]
# specify column names
columns = ['student_ID','student_name', 'college','marks']
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data,columns)
# show dataframe
dataframe.show()
O/P:
+----------+------------+---------+-----+
|student_ID|student_name| college|marks|
+----------+------------+---------+-----+
| 1| null| vignan| 95.0|
| null| ojaswi| vvit| 55.0|
| 3| null| vvit| 65.0|
| null| sridevi| vignan| null|
| null| Alrich|nonvignan| 56.0|
| 1| null| null| 55.0|
| 5| gnanesh| iit| 1.0|
| 1| null| vignan| 95.0|
| 6| null| vignan| 22.0|
+----------+------------+---------+-----+

Boom!.. Our DataFrame is created. Now let’s first check if is there any missing value present in this data frame.

from pyspark.sql.functions import *print(dataframe.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dataframe.columns]).show())O/P:
+----------+------------+-------+-----+ |student_ID|student_name|college|marks|
+----------+------------+-------+-----+
| 3| 5| 1| 1|
+----------+------------+-------+-----+

As we can saw that “student_ID”, “student_name”, “college” and “marks” have 3,4,1,1 missing values respectively.

Now, we’ll drop those missing values. Spark provides dropna() function, that is used to drop rows with null values in one or multiple(any/all) columns in DataFrame. While reading data from files, Spark API’s like DataFrame and Dataset assigns NULL values for empty value on columns.

Something based on a need you many needs to remove these rows that have null values as part of data cleaning.

dataframe = dataframe.dropna()
dataframe.show()
O/P:
+----------+------------+-------+-----+ |student_ID|student_name|college|marks|
+----------+------------+-------+-----+
| 5| gnanesh| iit| 1.0|
+----------+------------+-------+-----+

There are some parameters used in dropna() function, you can explore them by just clicking here.

Handle missing value with imputation

  1. For numeric column
  • Mean imputation
  • Median imputation
  • Mode imputation

2. For categorical column

  • Frequent category imputation

Mean imputation

Mean imputation is carried out on numeric feature/column. In this imputation missing value will be replaced by the mean of a particular feature/column.

dataframe.printSchema()O/P:
root |-- student_ID: long (nullable = true)
|-- student_name: string (nullable = true)
|-- college: string (nullable = true)
|-- marks: double (nullable = true)

As we can saw the “student_ID” and “marks” both are numeric columns. So we’ll perform mean imputation on these column.

from pyspark.ml.feature import Imputercolumn_subset = [col_ for col_ in dataframe.columns if         dataframe.select(col_).dtypes[0][1] !="string"]imputer = Imputer(inputCols=column_subset,
outputCols=[col_ for col_ in column_subset]
).setStrategy("mean")
dataframe = imputer.fit(dataframe).transform(dataframe)
dataframe.show()O/P:
+----------+------------+---------+-----+
|student_ID|student_name| college|marks|
+----------+------------+---------+-----+
| 1| null| vignan| 95.0|
| 2| ojaswi| vvit| 55.0|
| 3| null| vvit| 65.0|
| 2| sridevi| vignan| 55.5|
| 2| Alrich|nonvignan| 56.0|
| 1| null| null| 55.0|
| 5| gnanesh| iit| 1.0|
| 1| null| vignan| 95.0|
| 6| null| vignan| 22.0|
+----------+------------+---------+-----+

Median imputation

Median imputation is carried out on numeric feature/column. In this imputation missing value will be replaced by the median of a particular feature/column.

from pyspark.ml.feature import Imputercolumn_subset = [col_ for col_ in dataframe.columns if         dataframe.select(col_).dtypes[0][1] !="string"]imputer = Imputer(inputCols=column_subset,
outputCols=[col_ for col_ in column_subset]
).setStrategy("median")
dataframe = imputer.fit(dataframe).transform(dataframe)
dataframe.show()O/P:
+----------+------------+---------+-----+
|student_ID|student_name| college|marks|
+----------+------------+---------+-----+
| 1| null| vignan| 95.0|
| 2| ojaswi| vvit| 55.0|
| 3| null| vvit| 65.0|
| 2| sridevi| vignan| 55.5|
| 2| Alrich|nonvignan| 56.0|
| 1| null| null| 55.0|
| 5| gnanesh| iit| 1.0|
| 1| null| vignan| 95.0|
| 6| null| vignan| 22.0|
+----------+------------+---------+-----+

Mode imputation

Mode imputation is carried out on numeric feature/column. In this imputation missing value will be replaced by the mode of a particular feature/column.

column_subset = [col_ for col_ in dataframe.columns if dataframe.select(col_).dtypes[0][1] !="string"]for col_ in column_subset:
temp_col = dataframe.groupBy(col_).count()
temp_col = temp_col.dropna(subset=col_)
mode = temp_col.orderBy(temp_col['count'].desc()).collect()[0] [0]
dataframe = dataframe.fillna(mode, subset=col_)
dataframe.show()
O/P:
+----------+------------+---------+-----+
|student_ID|student_name| college|marks|
+----------+------------+---------+-----+
| 1| null| vignan| 95.0|
| 1| ojaswi| vvit| 55.0|
| 3| null| vvit| 65.0|
| 1| sridevi| vignan| 95.0|
| 1| Alrich|nonvignan| 56.0|
| 1| null| null| 55.0|
| 5| gnanesh| iit| 1.0|
| 1| null| vignan| 95.0|
| 6| null| vignan| 22.0|
+----------+------------+---------+-----+

Frequent category imputation

Now we saw all imputation method for the numeric column. What if DataFrame’s column is categorical?

For that we’ll use frequent category imputation technique. In this technique we’ll impute missing value by the frequently occurred category.

column_subset = [col_ for col_ in dataframe.columns if dataframe.select(col_).dtypes[0][1] !="string"]for col_ in column_subset:
temp_col = dataframe.groupBy(col_).count()
temp_col = temp_col.dropna(subset=col_)
frequent_category=temp_col.orderBy(
temp_col['count'].desc()).collect()[0][0]
dataframe = dataframe.fillna(frequent_category, subset=col_)
dataframe.show()
O/P:
+----------+------------+---------+-----+
|student_ID|student_name| college|marks|
+----------+------------+---------+-----+
| 1| ojaswi| vignan| 95.0|
| null| ojaswi| vvit| 55.0|
| 3| ojaswi| vvit| 65.0|
| null| sridevi| vignan| null|
| null| Alrich|nonvignan| 56.0|
| 1| ojaswi| vignan| 55.0|
| 5| gnanesh| iit| 1.0|
| 1| ojaswi| vignan| 95.0|
| 6| ojaswi| vignan| 22.0|
+----------+------------+---------+-----+

Let’s break down above code for the better understanding.

column_subset = [col_ for col_ in dataframe.columns if dataframe.select(col_).dtypes[0][1] !="string"]for col_ in column_subset:
temp_col = dataframe.groupBy(col_).count()
temp_col = temp_col.dropna(subset=col_)
frequent_category
= temp_col.orderBy(
temp_col['count'].desc()).show()
O/P:
+------------+-----+
|student_name|count|
+------------+-----+
| ojaswi| 6|
| Alrich| 1|
| sridevi| 1|
| gnanesh| 1|
+------------+-----+
+---------+-----+
| college|count|
+---------+-----+
| vignan| 5|
| iit| 1|
| vvit| 2|
|nonvignan| 1|
+---------+-----+

As we can saw that above code gave us the total number of time category occurred. So now we’ll convert frequent_category into RDD to collect most frequent item as follow.

frequent_category = temp_col.orderBy(
temp_col['count'].desc()).collect()[0][0]
print(frequent_category)
O/P:
ojaswi
vignan

And boom!! And here we got the frequently occurred items. Now just impute them in place of missing value.

Outlier detection, removal and imputation

What is outlier?

In layman terms outliers are the data points that differs significantly from the observations. Or we can say that outliers are “the one of these things is not like others”.😉

Outlier

But wait!✋🤚

Do outliers really dangerous?🤔

Outliers are not always dangerous for our problem statement. In fact, outliers sometimes can be helpful indicators.

For example, credit/debit card fraud detection, The credit/debit card industry has utilized the concept of outliers in detecting credit card fraud in the past and now.

Okay now you got the idea about outliers, so your question is how to detect outliers, right? Let’s deep dive into it.

Outlier detection

There are many methods for detecting outlier but here we’ll mainly focus on following two methods.

  1. Using Z-score
  2. Using IQR

Using Z-score

Z-score is a parametric outlier detection method in a one or low dimensional feature space.

This technique assumes a normal distribution/Gaussian distribution of the data. The outliers are the data points that are in the tails of the distribution and therefore far from the mean.

A normal distribution/Gaussian distribution is shown above and it is estimated that,
68% of the data points lie between +/- 1 standard deviation.
95% of the data points lie between +/- 2 standard deviation
99.7% of the data points lie between +/- 3 standard deviation

We can find Z-score using below formula

Z-score = (x -mean) / standard deviation

If the z-score of a data point is more than +/- 3, it indicates that the data point is quite different from the other data points. Such a data point can be an outlier.

Let’s find outlier,

In order to find outlier first let’s create Pyspark DataFrame which has outlier.

data = [["Patty O’Furniture",5.9],
["Paddy O’Furniture",5.2],
["Olive Yew",5.1],
["Aida Bugg",5.5],
["Maureen Biologist",4.9],
["Teri Dacty",5.4],
["Peg Legge",6.2],
["Allie Grate",6.5],
["Liz Erd",7.1],
["A. Mused",14.5],
["Constance Noring",6.1],
["Lois Di Nominator",5.6],
["Minnie Van Ryder",1.2],
["Lynn O’Leeum",5.5]]
columns = ['student_name','height']dataframe = spark.createDataFrame(data,columns)
dataframe.show()
O/P:
+-----------------+------+
| student_name|height|
+-----------------+------+
|Patty O’Furniture| 5.9|
|Paddy O’Furniture| 5.2|
| Olive Yew| 5.1|
| Aida Bugg| 5.5|
|Maureen Biologist| 4.9|
| Teri Dacty| 5.4|
| Peg Legge| 6.2|
| Allie Grate| 6.5|
| Liz Erd| 7.1|
| A. Mused| 14.5|
| Constance Noring| 6.1|
|Lois Di Nominator| 5.6|
| Minnie Van Ryder| 1.2|
| Lynn O’Leeum| 5.5|
+-----------------+------+

Now let’s detect outlier in above DataFrame using Z-score method

from pyspark.sql.functions import *column_subset = dataframe.columns
for col in column_subset:
if dataframe.select(col).dtypes[0][1]=="string":
pass
else:
mean = dataframe.select(mean(col)).collect()[0][0]
stddev = dataframe.select(stddev(col)).collect()[0][0]
upper_limit = mean + (3*stddev)
lower_limit = mean - (3*stddev)
dataframe = dataframe.filter((dataframe[col]<lower_limit) | (dataframe[col]>upper_limit))
dataframe.show()O/P:
+------------+------+
|student_name|height|
+------------+------+
| A. Mused| 14.5|
+------------+------+
Note: Outlier can be found in numerical column only

As we can see that student_name=A. Mused has 14.5 height is an outlier.

And bust!!⚡ you got clear idea about how to find outlier, Now let’s dive deep into the outlier removal and outlier imputation technique.

Z-score Outlier removal

from pyspark.sql.functions import *column_subset = dataframe.columns
for col in column_subset:
if dataframe.select(col).dtypes[0][1]=="string":
pass
else:
mean = dataframe.select(mean(col)).collect()[0][0]
stddev = dataframe.select(stddev(col)).collect()[0][0]
upper_limit = mean + (3*stddev)
lower_limit = mean - (3*stddev)
dataframe = dataframe.filter((dataframe[col]>lower_limit) & (dataframe[col]<upper_limit))
dataframe.show()O/P:
+-----------------+------+
| student_name|height|
+-----------------+------+
|Patty O’Furniture| 5.9|
|Paddy O’Furniture| 5.2|
| Olive Yew| 5.1|
| Aida Bugg| 5.5|
|Maureen Biologist| 4.9|
| Teri Dacty| 5.4|
| Peg Legge| 6.2|
| Allie Grate| 6.5|
| Liz Erd| 7.1|
| Constance Noring| 6.1|
|Lois Di Nominator| 5.6|
| Minnie Van Ryder| 1.2|
| Lynn O’Leeum| 5.5|
+-----------------+------+

As we can see that student_name=A. Mused has 14.5 height (which is an outlier) has been removed.

Z-score Outlier imputation

We can impute outlier by mean or median of the particular feature/column.

Here we are gonna see mean imputation.

from pyspark.sql import Window
from pyspark.sql.functions import *
column_subset = dataframe.columns
for col in column_subset:
if dataframe.select(col).dtypes[0][1]=="string":
pass
else:
mean = dataframe.select(mean(col)).collect()[0][0]
stddev = dataframe.select(stddev(col)).collect()[0][0]
upper_limit = mean + (3*stddev)
lower_limit = mean - (3*stddev)
dataframe = dataframe.withColumn(col,when((dataframe[col] <lower_limit) | (dataframe[col]>upper_limit),
round(mean(col).over(Window.orderBy(lit(1)))).cast('int')).otherwise(dataframe[col]))
dataframe.show()O/P:
+-----------------+------+
| student_name|height|
+-----------------+------+
|Patty O’Furniture| 5.9|
|Paddy O’Furniture| 5.2|
| Olive Yew| 5.1|
| Aida Bugg| 5.5|
|Maureen Biologist| 4.9|
| Teri Dacty| 5.4|
| Peg Legge| 6.2|
| Allie Grate| 6.5|
| Liz Erd| 7.1|
| A. Mused| 6.0|
| Constance Noring| 6.1|
|Lois Di Nominator| 5.6|
| Minnie Van Ryder| 1.2|
| Lynn O’Leeum| 5.5|
+-----------------+------+

Look, we successfully imputed outlier by mean value of that feature/column. “| A. Mused| 6.0|”.

Same way you can impute outlier by median.

This is all about Z-score outlier detection method, Now let’s look at IQR outlier detection method.

Using IQR

IQR (interquartile range) is a measure of statistical dispersion, which is the spread of the data. The IQR may also be called the midspread, middle 50%, or H‑spread. It is defined as the difference between the 75th and 25th percentiles of the data.

or

The IQR (interquartile range) defines the difference between the third and the first quartile. Quartiles are the partitioned values that divide the whole series into 4 equal parts. So, there are 3 quartiles. First Quartile is denoted by Q1 known as the lower quartile, the second Quartile is denoted by Q2 and the third Quartile is denoted by Q3 known as the upper quartile. Therefore, the interquartile range is equal to the upper quartile minus lower quartile.

IQR

So formula of IQR is,

IQR = Q3 - Q1

To detect the outliers using this method, we define a new range, let’s call it decision range, and any data point lying outside this range is considered as outlier and is accordingly dealt with. The range is as given below:

Lower Bound: (Q1 - 1.5 * IQR)
Upper Bound: (Q3 + 1.5 * IQR)

Any data point less than the Lower Bound or more than the Upper Bound is considered as an outlier.

Let’s find outlier on above DataFrame

for col_ in dataframe.columns:
if dataframe.select(col_).dtypes[0][1]=="string":
pass
else:
q1,q3 = dataframe.approxQuantile(col_,[0.25, 0.75],0)
IQR = q3 - q1
lower_bound = q1 - (1.5*IQR)
upper_bound = q3 + (1.5*IQR)
dataframe = dataframe.filter((dataframe[col]<lower_bound) | (dataframe[col]>upper_bound))
dataframe.show()O/P:
+----------------+------+
| student_name|height|
+----------------+------+
| A. Mused| 14.5|
|Minnie Van Ryder| 1.2|
+----------------+------+
Note: Outlier can be found in numerical column only

As we can see that A. Mused and Minnie Van Ryder both are the outliers.

Now let’s dive deep into the IQR outlier removal and outlier imputation technique.

IQR Outlier removal

for col_ in dataframe.columns:
if dataframe.select(col_).dtypes[0][1]=="string":
pass
else:
q1,q3 = dataframe.approxQuantile(col_,[0.25, 0.75],0)
IQR = q3 - q1
lower_bound = q1 - (1.5*IQR)
upper_bound = q3 + (1.5*IQR)
dataframe = dataframe.filter((dataframe[col]>lower_bound) & (dataframe[col]<upper_bound))
dataframe.show()O/P:
+-----------------+------+
| student_name|height|
+-----------------+------+
|Patty O’Furniture| 5.9|
|Paddy O’Furniture| 5.2|
| Olive Yew| 5.1|
| Aida Bugg| 5.5|
|Maureen Biologist| 4.9|
| Teri Dacty| 5.4|
| Peg Legge| 6.2|
| Allie Grate| 6.5|
| Liz Erd| 7.1|
| Constance Noring| 6.1|
|Lois Di Nominator| 5.6|
| Lynn O’Leeum| 5.5|
+-----------------+------+

As we can see that student_name=A. Mused and Minnie Van Ryder (which are the outliers) have been removed.

IQR Outlier imputation

We can impute outlier by mean or median of the particular feature/column.

Here we are gonna see mean imputation.

for col_ in dataframe.columns:
if dataframe.select(col_).dtypes[0][1]=="string":
pass
else:
q1,q3 = dataframe.approxQuantile(col_,[0.25, 0.75],0)
IQR = q3 - q1
lower_bound = q1 - (1.5*IQR)
upper_bound = q3 + (1.5*IQR)
dataframe = dataframe.withColumn(col_,when((dataframe[col_]<lower_bound) (dataframe[col_]>upper_bound),round(mean(col_).over(Window.orderBy(lit(1))))).otherwise(dataframe[col_]))
dataframe.show()O/P:
+-----------------+------+
| student_name|height|
+-----------------+------+
|Patty O’Furniture| 5.9|
|Paddy O’Furniture| 5.2|
| Olive Yew| 5.1|
| Aida Bugg| 5.5|
|Maureen Biologist| 4.9|
| Teri Dacty| 5.4|
| Peg Legge| 6.2|
| Allie Grate| 6.5|
| Liz Erd| 7.1|
| A. Mused| 6.0|
| Constance Noring| 6.1|
|Lois Di Nominator| 5.6|
| Minnie Van Ryder| 6.0|
| Lynn O’Leeum| 5.5|
+-----------------+------+

Look, we successfully imputed outlier by mean value of that feature/column. “| A. Mused| 6.0|” and “| Minnie Van Ryder| 6.0|”.

Conclusion

So, to summarize, we have understood the overview of Pyspark, Drop null value, Handle missing value with imputation, Outlier detection, removal and imputation. We have seen these techniques with implementation in Pyspark and Python.

Now in second part we’ll be discuss about Drop feature/column, Drop duplicates, Convert categorical feature/column into numerical, Feature scaling, Train test split data therefor stay tuned😊 and stay healthy(●'◡'●).

Thank you for reading!

Do you have any question? if yes, please reach out to me on LinkedIn — Vishal Barad. Happy to chat!

Any feedback would be much appreciated and if you liked what you’ve read please hold the clap button!

--

--

Vishal Barad
Vedity
Writer for

AI professional | Data scientist | Data engineer