Univariate Linear Regression in Pyspark
Building machine learning models for big data has gained lot of pace in the recent years, I like to demonstrate univariate linear regression in Pyspark along with my other posts in R and Python.
Introduction
Linear regression is a technique to identify the linear relationship between independent variables and dependent variables. In LR the dependent variable is always a continuous variable.
Linear regression with one independent variable is called Univariate linear regression
The equation for linear regression is
Loading Libraries
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt
Create Spark Session
spark = SparkSession.builder.appName('Univariate LR').getOrCreate()
spark
Loading Data
file_path = 'input/ex1data1.txt'
data = spark.read.csv(file_path, inferSchema=True, header=False).toDF('population','profit')
data.printSchema()root
|-- population: double (nullable = true)
|-- profit: double (nullable = true)data.show(5)+----------+------+
|population|profit|
+----------+------+
| 6.1101|17.592|
| 5.5277|9.1302|
| 8.5186|13.662|
| 7.0032|11.854|
| 5.8598|6.8233|
+----------+------+
only showing top 5 rowsdata.describe().show()+-------+-----------------+-----------------+
|summary| population| profit|
+-------+-----------------+-----------------+
| count| 97| 97|
| mean|8.159800000000002| 5.83913505154639|
| stddev|3.869883527882331|5.510262255231546|
| min| 5.0269| -2.6807|
| max| 22.203| 24.147|
+-------+-----------------+-----------------+
Feature Transformation
In spark MLib, input variables or features are represented as vectors and by default carries the name as ‘features’. The below code uses VectorAssembler to assemble input variables as vector (in this case only one predictor ‘population’)
assembler = VectorAssembler(inputCols=['population'], outputCol='features')
df = assembler.transform(data)
df.printSchema()root
|-- population: double (nullable = true)
|-- profit: double (nullable = true)
|-- features: vector (nullable = true)df = df.select(['features','profit'])
Model
lr = LinearRegression(maxIter=10, labelCol='profit')
lrModel = lr.fit(df)
Intercept and Coefficients
print(f'Intercept: {lrModel.intercept}\nCoefficient: {lrModel.coefficients.values}')Intercept: -3.895780878311882
Coefficient: [1.19303364]
Model Evaluation Metrics
Spark MLib gives all required evaluation metrics for the respective models which can be accessed as attributes of model summary as shown below.
modelsummary = lrModel.summaryprint(f'Explained Variance: {modelsummary.explainedVariance}\nR Squared: {modelsummary.r2}')
print(f'Std. Error: {modelsummary.coefficientStandardErrors}\nRoot Mean Squared Err: {modelsummary.rootMeanSquaredError}')
print(f'Mean Absolute Err: {modelsummary.meanAbsoluteError}\nMean Squared Err: {modelsummary.meanSquaredError}')
print(f'P-value: {modelsummary.pValues}')
modelsummary.residuals.show(5)
print(f'Num Iterations: {modelsummary.totalIterations}\nObjective History: {modelsummary.objectiveHistory}')
Explained Variance: 21.0960268527634
R Squared: 0.70203155378414
Std. Error: [0.07974394383258744, 0.7194828241406706]
Root Mean Squared Err: 2.9923139460876023
Mean Absolute Err: 2.1942453988270034
Mean Squared Err: 8.953942751950358
P-value: [0.0, 4.607886650020987e-07]
+------------------+
| residuals|
+------------------+
|14.198226008949028|
| 6.43124880332505|
| 7.394804476918388|
|7.3947276613233015|
|3.7281423300896854|
+------------------+
only showing top 5 rows
Num Iterations: 1
Objective History: [0.0]
Objective history gives the history of p-values for each iterations from starting point.
Predictions
The model summary has a collection called predictions which contains the features (inputs to model), label (target) and predictions as dataframe.
modelsummary.predictions.show(5)+--------+------+------------------+
|features|profit| prediction|
+--------+------+------------------+
|[6.1101]|17.592|3.3937739910509706|
|[5.5277]|9.1302| 2.69895119667495|
|[8.5186]|13.662| 6.267195523081613|
|[7.0032]|11.854| 4.459272338676698|
|[5.8598]|6.8233|3.0951576699103143|
+--------+------+------------------+
only showing top 5 rows
Conclusion
With other posts in R and Python, the intercept and coefficient we got using pyspark is also same. It is better to know how to achieve similar things in multiple languages as data scientist and with recent increase in demand for spark (big data ML), I advise to try your hands on your variant of Spark.