Data Preprocessing Using Pyspark (Part:1)
Apache Spark is a framework that allows for quick data processing on large amounts of data.
Data preprocessing is a necessary step in machine learning as the quality of the data affects the result and performance of the machine learning model which we applied on data. Therefore, it is extremely important that we preprocess our data before feeding it into our model.
Data preprocessing task is carried out by pandas and Pyspark as well, Here we are going to explore Pyspark.
Before we dive deep in to the preprocessing part let’s first take a brief knowledge about data exploration,
Data exploration
Data exploration, also known as exploratory data analysis (EDA), is a process for exploring, visualizing data to find pattern or uncover insight from the start and helps in identifying problems in the DataFrame as well as deciding which model or algorithm to use in subsequent steps.
Let’s quickly discuss some of the data exploration technique
- Statistical summary of data
Pyspark’s describe() is used to view some basic statistical details like count, mean, stddev etc. of a DataFrame or a series of numeric …
data = [[1,None,"vignan",95.0],
[None,"ojaswi","vvit",55.0],
[3,None,"vvit",65.0],
[None,"sridevi","vignan",None], [None,"Alrich","nonvignan",56.0],
[1,None,None,55.0],
[5,"gnanesh","iit",1.0],
[1,None,"vignan",95.0],
[6,None,"vignan",22.0]]
columns = ['student_ID','student_name', 'college','marks']
dataframe = spark.createDataFrame(data,columns)
dataframe.describe().show()O/P:
+-------+------------------+------------+-------+------------------+ |summary| student_ID|student_name|college| marks| +-------+------------------+------------+-------+------------------+ | count| 6| 4| 8| 8| | mean|2.8333333333333335| null| null| 55.5| | stddev| 2.228601953392904| null| null|32.302144997330615| | min| 1| Alrich| iit| 1.0| | max| 6| sridevi| vvit| 95.0| +-------+------------------+------------+-------+------------------+
- Count total number of null values
Let’s find and visualize the total number of null value present in DataFrame’s column,
import matplotlib.pyplot as pltnull_value_list = list()
for col_ in dataframe.columns:
null_value_list.append(dataframe.filter(dataframe[col_].
isNull().count()) plt.rcParams["figure.figsize"] = (40,9)
columns = [col_ for col_ in dataframe.columns]
myexplode = [0.2, 0, 0, 0]
plt.pie(null_value_list, labels = columns, explode = myexplode, shadow = True, autopct='%1.0f%%')
plt.title('Total number of null value in column')plt.show()O/P:
As we can saw “student_ID” column has 3 null values, same as for “student_name” has 5 and so on.
- Count duplicate row
Let’s find how many duplicate row present in DataFrame,
import pyspark.sql.functions as funcsdataframe.groupBy(dataframe.columns).count().where(funcs.col('count') > 1).select(funcs.sum('count')).show()O/P:
+----------+
|sum(count)|
+----------+
| 2|
+----------+
as we can show there are two duplicate rows in DataFrame as shown below.
- Find numeric and categorical column
Let’s find numeric (!=string) and categorical (==string) columns in DataFrame,
numeric_columns = list()
categorical_column = list()for col_ in dataframe.columns:
if dataframe.select(col_).dtypes[0][1] != "string":
numeric_columns.append(col_)
else:
categorical_column.append(col_)
print("Numeric columns",numeric_columns)
print("categorical columns",categorical_column)O/P:
Numeric columns ['student_ID', 'marks']
categorical columns ['student_name', 'college']
Now you have better clarity about Data exploration now we are going to explore Data preprocessing,
So here we go🚩,
Data preprocessing’s techniques we are going to discuss
- Drop null value
- Handle missing value with imputation
- Outlier detection, removal and imputation
- Drop feature/column
- Drop duplicates
- Convert categorical feature/column into numerical
- Feature scaling
- Train test split data
Now, here we’ll discuss first three techniques in deep with implementation. Rest of others we’ll discuss in second part of this blog.
Drop null value
Let’s first create Pyspark DataFrame. After creating data frame we’ll perform this operation.
# Create pyspark dataframe
data = [[1,None,"vignan",95.0],
[None,"ojaswi","vvit",55.0],
[3,None,"vvit",65.0],
[None,"sridevi","vignan",None], [None,"Alrich","nonvignan",56.0],
[1,None,None,55.0],
[5,"gnanesh","iit",1.0],
[1,None,"vignan",95.0],
[6,None,"vignan",22.0]]# specify column names
columns = ['student_ID','student_name', 'college','marks']# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data,columns)# show dataframe
dataframe.show()O/P:
+----------+------------+---------+-----+
|student_ID|student_name| college|marks|
+----------+------------+---------+-----+
| 1| null| vignan| 95.0|
| null| ojaswi| vvit| 55.0|
| 3| null| vvit| 65.0|
| null| sridevi| vignan| null|
| null| Alrich|nonvignan| 56.0|
| 1| null| null| 55.0|
| 5| gnanesh| iit| 1.0|
| 1| null| vignan| 95.0|
| 6| null| vignan| 22.0|
+----------+------------+---------+-----+
Boom!.. Our DataFrame is created. Now let’s first check if is there any missing value present in this data frame.
from pyspark.sql.functions import *print(dataframe.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dataframe.columns]).show())O/P:
+----------+------------+-------+-----+ |student_ID|student_name|college|marks|
+----------+------------+-------+-----+
| 3| 5| 1| 1|
+----------+------------+-------+-----+
As we can saw that “student_ID”, “student_name”, “college” and “marks” have 3,4,1,1 missing values respectively.
Now, we’ll drop those missing values. Spark provides dropna() function, that is used to drop rows with null values in one or multiple(any/all) columns in DataFrame. While reading data from files, Spark API’s like DataFrame and Dataset assigns NULL values for empty value on columns.
Something based on a need you many needs to remove these rows that have null values as part of data cleaning.
dataframe = dataframe.dropna()
dataframe.show()O/P:
+----------+------------+-------+-----+ |student_ID|student_name|college|marks|
+----------+------------+-------+-----+
| 5| gnanesh| iit| 1.0|
+----------+------------+-------+-----+
There are some parameters used in dropna() function, you can explore them by just clicking here.
Handle missing value with imputation
- For numeric column
- Mean imputation
- Median imputation
- Mode imputation
2. For categorical column
- Frequent category imputation
Mean imputation
Mean imputation is carried out on numeric feature/column. In this imputation missing value will be replaced by the mean of a particular feature/column.
dataframe.printSchema()O/P:
root |-- student_ID: long (nullable = true)
|-- student_name: string (nullable = true)
|-- college: string (nullable = true)
|-- marks: double (nullable = true)
As we can saw the “student_ID” and “marks” both are numeric columns. So we’ll perform mean imputation on these column.
from pyspark.ml.feature import Imputercolumn_subset = [col_ for col_ in dataframe.columns if dataframe.select(col_).dtypes[0][1] !="string"]imputer = Imputer(inputCols=column_subset,
outputCols=[col_ for col_ in column_subset]
).setStrategy("mean")
dataframe = imputer.fit(dataframe).transform(dataframe)dataframe.show()O/P:
+----------+------------+---------+-----+
|student_ID|student_name| college|marks|
+----------+------------+---------+-----+
| 1| null| vignan| 95.0|
| 2| ojaswi| vvit| 55.0|
| 3| null| vvit| 65.0|
| 2| sridevi| vignan| 55.5|
| 2| Alrich|nonvignan| 56.0|
| 1| null| null| 55.0|
| 5| gnanesh| iit| 1.0|
| 1| null| vignan| 95.0|
| 6| null| vignan| 22.0|
+----------+------------+---------+-----+
Median imputation
Median imputation is carried out on numeric feature/column. In this imputation missing value will be replaced by the median of a particular feature/column.
from pyspark.ml.feature import Imputercolumn_subset = [col_ for col_ in dataframe.columns if dataframe.select(col_).dtypes[0][1] !="string"]imputer = Imputer(inputCols=column_subset,
outputCols=[col_ for col_ in column_subset]
).setStrategy("median")
dataframe = imputer.fit(dataframe).transform(dataframe)dataframe.show()O/P:
+----------+------------+---------+-----+
|student_ID|student_name| college|marks|
+----------+------------+---------+-----+
| 1| null| vignan| 95.0|
| 2| ojaswi| vvit| 55.0|
| 3| null| vvit| 65.0|
| 2| sridevi| vignan| 55.5|
| 2| Alrich|nonvignan| 56.0|
| 1| null| null| 55.0|
| 5| gnanesh| iit| 1.0|
| 1| null| vignan| 95.0|
| 6| null| vignan| 22.0|
+----------+------------+---------+-----+
Mode imputation
Mode imputation is carried out on numeric feature/column. In this imputation missing value will be replaced by the mode of a particular feature/column.
column_subset = [col_ for col_ in dataframe.columns if dataframe.select(col_).dtypes[0][1] !="string"]for col_ in column_subset:
temp_col = dataframe.groupBy(col_).count()
temp_col = temp_col.dropna(subset=col_)
mode = temp_col.orderBy(temp_col['count'].desc()).collect()[0] [0]
dataframe = dataframe.fillna(mode, subset=col_)
dataframe.show()O/P:
+----------+------------+---------+-----+
|student_ID|student_name| college|marks|
+----------+------------+---------+-----+
| 1| null| vignan| 95.0|
| 1| ojaswi| vvit| 55.0|
| 3| null| vvit| 65.0|
| 1| sridevi| vignan| 95.0|
| 1| Alrich|nonvignan| 56.0|
| 1| null| null| 55.0|
| 5| gnanesh| iit| 1.0|
| 1| null| vignan| 95.0|
| 6| null| vignan| 22.0|
+----------+------------+---------+-----+
Frequent category imputation
Now we saw all imputation method for the numeric column. What if DataFrame’s column is categorical?
For that we’ll use frequent category imputation technique. In this technique we’ll impute missing value by the frequently occurred category.
column_subset = [col_ for col_ in dataframe.columns if dataframe.select(col_).dtypes[0][1] !="string"]for col_ in column_subset:
temp_col = dataframe.groupBy(col_).count()
temp_col = temp_col.dropna(subset=col_)
frequent_category=temp_col.orderBy(
temp_col['count'].desc()).collect()[0][0]
dataframe = dataframe.fillna(frequent_category, subset=col_)
dataframe.show()O/P:
+----------+------------+---------+-----+
|student_ID|student_name| college|marks|
+----------+------------+---------+-----+
| 1| ojaswi| vignan| 95.0|
| null| ojaswi| vvit| 55.0|
| 3| ojaswi| vvit| 65.0|
| null| sridevi| vignan| null|
| null| Alrich|nonvignan| 56.0|
| 1| ojaswi| vignan| 55.0|
| 5| gnanesh| iit| 1.0|
| 1| ojaswi| vignan| 95.0|
| 6| ojaswi| vignan| 22.0|
+----------+------------+---------+-----+
Let’s break down above code for the better understanding.
column_subset = [col_ for col_ in dataframe.columns if dataframe.select(col_).dtypes[0][1] !="string"]for col_ in column_subset:
temp_col = dataframe.groupBy(col_).count()
temp_col = temp_col.dropna(subset=col_)
frequent_category = temp_col.orderBy(
temp_col['count'].desc()).show()O/P:
+------------+-----+
|student_name|count|
+------------+-----+
| ojaswi| 6|
| Alrich| 1|
| sridevi| 1|
| gnanesh| 1|
+------------+-----++---------+-----+
| college|count|
+---------+-----+
| vignan| 5|
| iit| 1|
| vvit| 2|
|nonvignan| 1|
+---------+-----+
As we can saw that above code gave us the total number of time category occurred. So now we’ll convert frequent_category into RDD to collect most frequent item as follow.
frequent_category = temp_col.orderBy(
temp_col['count'].desc()).collect()[0][0]
print(frequent_category)O/P:
ojaswi
vignan
And boom!! And here we got the frequently occurred items. Now just impute them in place of missing value.
Outlier detection, removal and imputation
What is outlier?
In layman terms outliers are the data points that differs significantly from the observations. Or we can say that outliers are “the one of these things is not like others”.😉
But wait!✋🤚
Do outliers really dangerous?🤔
Outliers are not always dangerous for our problem statement. In fact, outliers sometimes can be helpful indicators.
For example, credit/debit card fraud detection, The credit/debit card industry has utilized the concept of outliers in detecting credit card fraud in the past and now.
Okay now you got the idea about outliers, so your question is how to detect outliers, right? Let’s deep dive into it.
Outlier detection
There are many methods for detecting outlier but here we’ll mainly focus on following two methods.
- Using Z-score
- Using IQR
Using Z-score
Z-score is a parametric outlier detection method in a one or low dimensional feature space.
This technique assumes a normal distribution/Gaussian distribution of the data. The outliers are the data points that are in the tails of the distribution and therefore far from the mean.
A normal distribution/Gaussian distribution is shown above and it is estimated that,
68% of the data points lie between +/- 1 standard deviation.
95% of the data points lie between +/- 2 standard deviation
99.7% of the data points lie between +/- 3 standard deviation
We can find Z-score using below formula
Z-score = (x -mean) / standard deviation
If the z-score of a data point is more than +/- 3, it indicates that the data point is quite different from the other data points. Such a data point can be an outlier.
Let’s find outlier,
In order to find outlier first let’s create Pyspark DataFrame which has outlier.
data = [["Patty O’Furniture",5.9],
["Paddy O’Furniture",5.2],
["Olive Yew",5.1],
["Aida Bugg",5.5],
["Maureen Biologist",4.9],
["Teri Dacty",5.4],
["Peg Legge",6.2],
["Allie Grate",6.5],
["Liz Erd",7.1],
["A. Mused",14.5],
["Constance Noring",6.1],
["Lois Di Nominator",5.6],
["Minnie Van Ryder",1.2],
["Lynn O’Leeum",5.5]]columns = ['student_name','height']dataframe = spark.createDataFrame(data,columns)
dataframe.show()O/P:
+-----------------+------+
| student_name|height|
+-----------------+------+
|Patty O’Furniture| 5.9|
|Paddy O’Furniture| 5.2|
| Olive Yew| 5.1|
| Aida Bugg| 5.5|
|Maureen Biologist| 4.9|
| Teri Dacty| 5.4|
| Peg Legge| 6.2|
| Allie Grate| 6.5|
| Liz Erd| 7.1|
| A. Mused| 14.5|
| Constance Noring| 6.1|
|Lois Di Nominator| 5.6|
| Minnie Van Ryder| 1.2|
| Lynn O’Leeum| 5.5|
+-----------------+------+
Now let’s detect outlier in above DataFrame using Z-score method
from pyspark.sql.functions import *column_subset = dataframe.columns
for col in column_subset:
if dataframe.select(col).dtypes[0][1]=="string":
pass
else:
mean = dataframe.select(mean(col)).collect()[0][0]
stddev = dataframe.select(stddev(col)).collect()[0][0]
upper_limit = mean + (3*stddev)
lower_limit = mean - (3*stddev)
dataframe = dataframe.filter((dataframe[col]<lower_limit) | (dataframe[col]>upper_limit))dataframe.show()O/P:
+------------+------+
|student_name|height|
+------------+------+
| A. Mused| 14.5|
+------------+------+Note: Outlier can be found in numerical column only
As we can see that student_name=A. Mused has 14.5 height is an outlier.
And bust!!⚡ you got clear idea about how to find outlier, Now let’s dive deep into the outlier removal and outlier imputation technique.
Z-score Outlier removal
from pyspark.sql.functions import *column_subset = dataframe.columns
for col in column_subset:
if dataframe.select(col).dtypes[0][1]=="string":
pass
else:
mean = dataframe.select(mean(col)).collect()[0][0]
stddev = dataframe.select(stddev(col)).collect()[0][0]
upper_limit = mean + (3*stddev)
lower_limit = mean - (3*stddev)
dataframe = dataframe.filter((dataframe[col]>lower_limit) & (dataframe[col]<upper_limit))dataframe.show()O/P:
+-----------------+------+
| student_name|height|
+-----------------+------+
|Patty O’Furniture| 5.9|
|Paddy O’Furniture| 5.2|
| Olive Yew| 5.1|
| Aida Bugg| 5.5|
|Maureen Biologist| 4.9|
| Teri Dacty| 5.4|
| Peg Legge| 6.2|
| Allie Grate| 6.5|
| Liz Erd| 7.1|
| Constance Noring| 6.1|
|Lois Di Nominator| 5.6|
| Minnie Van Ryder| 1.2|
| Lynn O’Leeum| 5.5|
+-----------------+------+
As we can see that student_name=A. Mused has 14.5 height (which is an outlier) has been removed.
Z-score Outlier imputation
We can impute outlier by mean or median of the particular feature/column.
Here we are gonna see mean imputation.
from pyspark.sql import Window
from pyspark.sql.functions import *column_subset = dataframe.columns
for col in column_subset:
if dataframe.select(col).dtypes[0][1]=="string":
pass
else:
mean = dataframe.select(mean(col)).collect()[0][0]
stddev = dataframe.select(stddev(col)).collect()[0][0]
upper_limit = mean + (3*stddev)
lower_limit = mean - (3*stddev)
dataframe = dataframe.withColumn(col,when((dataframe[col] <lower_limit) | (dataframe[col]>upper_limit),
round(mean(col).over(Window.orderBy(lit(1)))).cast('int')).otherwise(dataframe[col]))dataframe.show()O/P:
+-----------------+------+
| student_name|height|
+-----------------+------+
|Patty O’Furniture| 5.9|
|Paddy O’Furniture| 5.2|
| Olive Yew| 5.1|
| Aida Bugg| 5.5|
|Maureen Biologist| 4.9|
| Teri Dacty| 5.4|
| Peg Legge| 6.2|
| Allie Grate| 6.5|
| Liz Erd| 7.1|
| A. Mused| 6.0|
| Constance Noring| 6.1|
|Lois Di Nominator| 5.6|
| Minnie Van Ryder| 1.2|
| Lynn O’Leeum| 5.5|
+-----------------+------+
Look, we successfully imputed outlier by mean value of that feature/column. “| A. Mused| 6.0|”.
Same way you can impute outlier by median.
This is all about Z-score outlier detection method, Now let’s look at IQR outlier detection method.
Using IQR
IQR (interquartile range) is a measure of statistical dispersion, which is the spread of the data. The IQR may also be called the midspread, middle 50%, or H‑spread. It is defined as the difference between the 75th and 25th percentiles of the data.
or
The IQR (interquartile range) defines the difference between the third and the first quartile. Quartiles are the partitioned values that divide the whole series into 4 equal parts. So, there are 3 quartiles. First Quartile is denoted by Q1 known as the lower quartile, the second Quartile is denoted by Q2 and the third Quartile is denoted by Q3 known as the upper quartile. Therefore, the interquartile range is equal to the upper quartile minus lower quartile.
So formula of IQR is,
IQR = Q3 - Q1
To detect the outliers using this method, we define a new range, let’s call it decision range, and any data point lying outside this range is considered as outlier and is accordingly dealt with. The range is as given below:
Lower Bound: (Q1 - 1.5 * IQR)
Upper Bound: (Q3 + 1.5 * IQR)
Any data point less than the Lower Bound or more than the Upper Bound is considered as an outlier.
Let’s find outlier on above DataFrame
for col_ in dataframe.columns:
if dataframe.select(col_).dtypes[0][1]=="string":
pass
else:
q1,q3 = dataframe.approxQuantile(col_,[0.25, 0.75],0)
IQR = q3 - q1
lower_bound = q1 - (1.5*IQR)
upper_bound = q3 + (1.5*IQR)
dataframe = dataframe.filter((dataframe[col]<lower_bound) | (dataframe[col]>upper_bound))dataframe.show()O/P:
+----------------+------+
| student_name|height|
+----------------+------+
| A. Mused| 14.5|
|Minnie Van Ryder| 1.2|
+----------------+------+Note: Outlier can be found in numerical column only
As we can see that A. Mused and Minnie Van Ryder both are the outliers.
Now let’s dive deep into the IQR outlier removal and outlier imputation technique.
IQR Outlier removal
for col_ in dataframe.columns:
if dataframe.select(col_).dtypes[0][1]=="string":
pass
else:
q1,q3 = dataframe.approxQuantile(col_,[0.25, 0.75],0)
IQR = q3 - q1
lower_bound = q1 - (1.5*IQR)
upper_bound = q3 + (1.5*IQR)
dataframe = dataframe.filter((dataframe[col]>lower_bound) & (dataframe[col]<upper_bound))dataframe.show()O/P:
+-----------------+------+
| student_name|height|
+-----------------+------+
|Patty O’Furniture| 5.9|
|Paddy O’Furniture| 5.2|
| Olive Yew| 5.1|
| Aida Bugg| 5.5|
|Maureen Biologist| 4.9|
| Teri Dacty| 5.4|
| Peg Legge| 6.2|
| Allie Grate| 6.5|
| Liz Erd| 7.1|
| Constance Noring| 6.1|
|Lois Di Nominator| 5.6|
| Lynn O’Leeum| 5.5|
+-----------------+------+
As we can see that student_name=A. Mused and Minnie Van Ryder (which are the outliers) have been removed.
IQR Outlier imputation
We can impute outlier by mean or median of the particular feature/column.
Here we are gonna see mean imputation.
for col_ in dataframe.columns:
if dataframe.select(col_).dtypes[0][1]=="string":
pass
else:
q1,q3 = dataframe.approxQuantile(col_,[0.25, 0.75],0)
IQR = q3 - q1
lower_bound = q1 - (1.5*IQR)
upper_bound = q3 + (1.5*IQR)
dataframe = dataframe.withColumn(col_,when((dataframe[col_]<lower_bound) (dataframe[col_]>upper_bound),round(mean(col_).over(Window.orderBy(lit(1))))).otherwise(dataframe[col_]))dataframe.show()O/P:
+-----------------+------+
| student_name|height|
+-----------------+------+
|Patty O’Furniture| 5.9|
|Paddy O’Furniture| 5.2|
| Olive Yew| 5.1|
| Aida Bugg| 5.5|
|Maureen Biologist| 4.9|
| Teri Dacty| 5.4|
| Peg Legge| 6.2|
| Allie Grate| 6.5|
| Liz Erd| 7.1|
| A. Mused| 6.0|
| Constance Noring| 6.1|
|Lois Di Nominator| 5.6|
| Minnie Van Ryder| 6.0|
| Lynn O’Leeum| 5.5|
+-----------------+------+
Look, we successfully imputed outlier by mean value of that feature/column. “| A. Mused| 6.0|” and “| Minnie Van Ryder| 6.0|”.
Conclusion
So, to summarize, we have understood the overview of Pyspark, Drop null value, Handle missing value with imputation, Outlier detection, removal and imputation. We have seen these techniques with implementation in Pyspark and Python.
Now in second part we’ll be discuss about Drop feature/column, Drop duplicates, Convert categorical feature/column into numerical, Feature scaling, Train test split data therefor stay tuned😊 and stay healthy(●'◡'●).
Thank you for reading!
Do you have any question? if yes, please reach out to me on LinkedIn — Vishal Barad. Happy to chat!
Any feedback would be much appreciated and if you liked what you’ve read please hold the clap button!