Pyspark quickstart for pandas users

Norm Niemer
DataSeries
Published in
2 min readOct 19, 2020

The fastest way to get up and running with pyspark!

Motivation

I found the quickstarts difficult to understand and they did not start from the beginning. And despite its apparent ubiquity, I found the pyspark documentation not to be great for a beginner.

This guide focused on a local mode setup to get up and running quickly for dev. If you want a cluster you will have to set up one using eg databricks.

Installation

All I had to do was pip install pyspark

In the past, I also had to download spark and use findspark but that does not seem to be necessary anymore.

It might require a JRE if you execute the below code and get: Java not found and JAVA_HOME environment variable is not set.

Additionally I had to get winutils (with eg gitzip) and set up the environmental var HADOOP_HOME.

Additional installation notes are here Installation — PySpark 3.2.0 documentation (apache.org)

[I was unable to make this work in windows, best to use linux.]python

Install Check Code

Below should run without errors.

import pyspark
sc = pyspark.SparkContext(appName="myAppName")

Pandas Code

Reference code translating pandas operations to pyspark. I will add more to it over time.

import pyspark
sc = pyspark.SparkContext(appName="myAppName")
# this gets you the spark variable that magically shows up in other guides
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("myAppName").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

import pyspark.sql.functions as F

import pandas as pd
import numpy as np
np.random.seed(1)
# generate some data
df1 = pd.DataFrame({'a':range(10),'v':np.random.normal(size=(10,)),'date':pd.date_range('2010-01-01',periods=10),'g':1})
df2 = df1.copy()
df2['g']=2
df2['v']=np.random.normal(size=(10,))
df=pd.concat([df1,df2]).reset_index(drop=True)
df['date_str']=df['date'].astype(str)

# create spark df
ddf = spark.createDataFrame(df)
#previewdf.head(2)
ddf.show(2)
ddf2[['a','v']].show(2)
ddf2.select(['a','v']).show(2)
# view / get pandas df
ddf.show()
ddf.toPandas()

assert ddf.toPandas().equals(df)
# indexing
df[df['a']==1]
ddf.filter("a==1").show()
ddf[ddf['a']==1].show()
ddf[ddf['a'].isin([3,5])].show()
# stats
df['v'].mean()
ddf.select(F.mean('v')).show()
ddf.select(F.avg(F.col('v'))).show()

# str
df['date_str_yr'] = df['date_str'].str[:4]
df['date_chk'] = pd.to_datetime(df['date_str'],format='%Y-%m-%d')

ddf = ddf.withColumn('date_str_yr', ddf['date_str'].substr(0, 4))
ddf.show()
# add column
df[v2']=df['v']*2
ddf2 = ddf2.withColumn('v2',ddf2['v']*2) # store column
# groupbydf.groupby(['g']).size()
ddf.groupby('g').count().show()

df.groupby(['g'])['v'].mean()
ddf.groupby('g').count().show()

df.groupby(['g']).head(1)
ddf.groupby(['g']).agg(F.first("v")).show()
# run sql
ddf.createOrReplaceTempView("tablename")
spark.sql("select * from tablename")

Partitions

Write

ddf.write.partitionBy("date_str").mode("overwrite").csv("data/bydate", header=True)

The partition column is not saved in the files, only in the directory name.

Read

spark.read.option("header",True).csv("data/bydate").toPandas()

This automatically adds the partition column to the dataframe.

References

--

--

Norm Niemer
DataSeries

Chief Data Scientist at multi-billion asset manager. Top 1% on Stackoverflow. Won high profile hackathons. MS Financial Engineering from Columbia University