Comparing Performance between Apache Spark and PySpark

Sahand Faraz Zarrinkoub
5 min readSep 6, 2019

--

Apache Spark is a computing framework widely used for Analytics, Machine Learning and Data Engineering. It is written in the Scala programming language, which is somewhat harder to learn than languages like Java and Python. For those who do not want to go through the trouble of learning Scala, PySpark, a Python API to Apache Spark, can be used instead.

Although the underlying computing system will be the same regardless of whether you are using PySpark or Apache Spark, it is interesting to compare performance between the two to see if there are any significant overhead differences. In this article, we will take an existing PySpark piece of code and re-implement it in Apache Spark. We will then compare the performance between the two.

The PySpark code we will start with is a web analytics script written for an e-commerce website. It starts with some import statements:

from pyspark.sql.types import StringType
from urllib.parse import urlsplit
from urllib.parse import unquote
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col, create_map, lit, avg, lead, unix_timestamp, countDistinct, date_trunc, hour, date_format
from itertools import chain
from pyspark.sql.functions import sum as summa # to avoid using not working built in function
from pyspark import SparkContext
from pyspark import SQLContext
import time

Then it defines a few utility functions:

def getDictFromFile(file):
text_file = open(file, "r", encoding='utf8')
lines = text_file.readlines()
dict = {}
for line in lines:
line = line.replace('\n', '')
split = line.split('=')
dict[split[0]] = split[1]
return dictdef mapCity(city, state):
if city in geoMappings:
return geoMappings[city]
elif state in geoMappings:
return geoMappings[state]
else:
return city
def mapState(state):
if state in geoMappings:
return geoMappings[state]
else:
return state
def extractPath(host, url):
if host in url:
return urlsplit(url).path
else:
return '-'

These utility functions are used to read a CSV data file as well as defining Spark UDFs:

countryMappings = getDictFromFile("mapping-files/countries.txt") 
mapCountryExpr = create_map([lit(x) for x in chain(*countryMappings.items())])
geoMappings = getDictFromFile("mapping-files/cities.txt")
mapCityUdf = udf(mapCity, StringType())
mapStateUdf = udf(mapState, StringType())
getPathUdf = udf(extractPath, StringType())
urlDecodeUdf = udf(unquote, StringType())

The mapping files are simply files that map alternative spellings of countries and cities to a single spelling. Then we define a function that does the bulk of the analytics work:

def processFile(filepath):

start = time.time()
print(filepath)
df = sqlContext.read.csv(
path=filepath,
header=True,
inferSchema=True
)
end = time.time()
readFileTime = end - start

start = time.time()
# webtrends fields
webtrendsDateField = 'datetime_utc'
sessionIdField = 'session_session_id'
dataCsRefererField = 'data_cs_referer'
sequenceNumberField = 'sequence_number'

# intermediate fields for calculations
nextTimeField = 'nextTime'
timeOnPageField = 'timeOnPage'
bounceField = 'bounce'
entryField = 'entry'
sessionPagesCountField = 'sessionPageCount'

# aggs/metric fields
sessionsAggField = 'sessions'
pageViewsAggField = 'page views'
uniquePageViewsField = 'unique page views'
bouncesAggField = 'bounces'
timeOnPageAggField = 'time on page'

# dimension fields
hostnameField = 'host name'
sourceField = 'source'
mediumField = 'medium'
countryField = 'country'
pagePathField = 'page path'
previousPagePathField = 'previous page path'
dateField = 'date'
hourField = 'hour'
pageTitleField = 'page title'
cityField = 'city'
stateField = 'state'

# do mappings
dfMapped = (
df
.withColumnRenamed('data_cs_host', hostnameField)
.withColumnRenamed('ext_source_name', sourceField)
.withColumnRenamed('ext_source_type', mediumField)
.withColumnRenamed('ext_geo_country', countryField)
.withColumnRenamed('data_cs_host', hostnameField)
.withColumnRenamed('data_cs_uri_stem', pagePathField)
.withColumnRenamed('data_wt_ti', pageTitleField)
.withColumnRenamed('ext_geo_city', cityField)
.withColumnRenamed('ext_geo_region', stateField)
)

# split referrer url
dfRefUrl = dfMapped.withColumn(previousPagePathField, getPathUdf(hostnameField, dataCsRefererField))

# map country, city and state names
dfGeoMapped = dfRefUrl.withColumn(countryField, mapCountryExpr.getItem(col(countryField)))
dfGeoMapped = dfGeoMapped.withColumn(cityField, mapCityUdf(cityField, stateField))
dfGeoMapped = dfGeoMapped.withColumn(stateField, mapStateUdf(stateField))

# calculate delta time between events
window = Window.partitionBy(sessionIdField).orderBy(webtrendsDateField)
dfTimeDelta = dfGeoMapped.withColumn(nextTimeField, lead(webtrendsDateField).over(window))
dfTimeDelta = dfTimeDelta.withColumn(timeOnPageField, unix_timestamp(nextTimeField) - unix_timestamp(webtrendsDateField))

# count pages per session to get bounces
sessionPagesCounts = dfTimeDelta.groupBy(sessionIdField).count()
sessionPagesCounts = sessionPagesCounts.withColumnRenamed('count', sessionPagesCountField)
dfBounce = dfTimeDelta.join(sessionPagesCounts, [sessionIdField])
dfBounce = dfBounce.withColumn(bounceField, F.when(dfBounce[sessionPagesCountField]==1,1).otherwise(0))

# use sequence number to get entry pages
dfEntries = dfBounce.withColumn(entryField, F.when(dfBounce[sequenceNumberField]==0,1).otherwise(0))

# convert to dateHour, truncating
dfHour = dfEntries.withColumn(hourField, hour(col(webtrendsDateField)))
dfHour = dfHour.withColumn(dateField, date_trunc('day', webtrendsDateField)) # truncate and date gets type timestamp
# convert date to string to get desired format
dfHour = dfHour.withColumn(dateField, date_format(col(dateField), "yyyy-MM-dd"))

# calculate aggregated metrics per hour
dfGrouped = (
dfHour.groupBy(
dateField,
hourField,
hostnameField,
sourceField,
mediumField,
countryField,
stateField,
cityField,
pagePathField,
previousPagePathField,
pageTitleField)
.agg(
F.count(F.lit(1)).alias(pageViewsAggField), # page views / events count
countDistinct(sessionIdField).alias(uniquePageViewsField), # unique page count
summa(bounceField).alias(bouncesAggField), # bounces count
summa(entryField).alias(sessionsAggField), # entries/sessions count
avg(col(timeOnPageField)).alias(timeOnPageAggField), # time on page average
)
)

# fill empty Page Titles, and apply url decoding
dfGrouped = dfGrouped.na.fill({pageTitleField : 'Empty Title'})
dfGrouped = dfGrouped.withColumn(pageTitleField, urlDecodeUdf(pageTitleField))

# fill in with zeros for null values of time on page
dfGrouped = dfGrouped.na.fill({timeOnPageAggField : 0})

dfGrouped.filter("country = \"ACountry\"").filter("state = \"AState\"").show()

end = time.time()
processingTime = end - start

return {"readFile": readFileTime, "processing": processingTime}

The time elapsed for reading the datafile (a large CSV file on disk) as well as the PySpark processing time are measured and printed separately.

We start the program by calling the `processFile()` function and printing its result:

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
times = processFile("resources/datafile.csv")
print("read file time:", times["readFile"])
print("processing time:", times["processing"])

Let’s implement the same functionality in Apache Spark. The code can be found at https://github.com/Sahand1993/apacheSparkWebAnalytics/blob/master/main.scala.

Note that we never loop over the `processFile()`function call in either PySpark or Apache Spark, since this would speed up performance on subsequent runs due to Spark cacheing data and interim computations. In fact, running the `processFile()` function in a loop with the same input file leads to very similar performance between PySpark and Apache Spark. We instead take the average of 10 fresh runs, which yields the following results:

Execution times for the PySpark implementation
Execution times for the Apache Spark implementation

As can be seen in the tables, when reading files, PySpark is slightly faster than Apache Spark. However, for the processing of the file data, Apache Spark is significantly faster, with 8.53 seconds against 11.7, a 27% difference. If performance is of critical importance in your project, it could be worth it to use Apache Spark even if you are not very familiar with the Scala programming language.

--

--