Pyspark quickstart for pandas users
The fastest way to get up and running with pyspark!
Motivation
I found the quickstarts difficult to understand and they did not start from the beginning. And despite its apparent ubiquity, I found the pyspark documentation not to be great for a beginner.
This guide focused on a local mode setup to get up and running quickly for dev. If you want a cluster you will have to set up one using eg databricks.
Installation
All I had to do was pip install pyspark
In the past, I also had to download spark and use findspark but that does not seem to be necessary anymore.
It might require a JRE if you execute the below code and get: Java not found and JAVA_HOME environment variable is not set.
Additionally I had to get winutils (with eg gitzip) and set up the environmental var HADOOP_HOME
.
Additional installation notes are here Installation — PySpark 3.2.0 documentation (apache.org)
[I was unable to make this work in windows, best to use linux.]python
Install Check Code
Below should run without errors.
import pyspark
sc = pyspark.SparkContext(appName="myAppName")
Pandas Code
Reference code translating pandas operations to pyspark. I will add more to it over time.
import pyspark
sc = pyspark.SparkContext(appName="myAppName")# this gets you the spark variable that magically shows up in other guides
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("myAppName").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
np.random.seed(1)# generate some data
df1 = pd.DataFrame({'a':range(10),'v':np.random.normal(size=(10,)),'date':pd.date_range('2010-01-01',periods=10),'g':1})
df2 = df1.copy()
df2['g']=2
df2['v']=np.random.normal(size=(10,))
df=pd.concat([df1,df2]).reset_index(drop=True)
df['date_str']=df['date'].astype(str)
# create spark df
ddf = spark.createDataFrame(df)
#previewdf.head(2)
ddf.show(2)ddf2[['a','v']].show(2)
ddf2.select(['a','v']).show(2)# view / get pandas df
ddf.show()
ddf.toPandas()
assert ddf.toPandas().equals(df)# indexing
df[df['a']==1]
ddf.filter("a==1").show()
ddf[ddf['a']==1].show()
ddf[ddf['a'].isin([3,5])].show()# stats
df['v'].mean()
ddf.select(F.mean('v')).show()
ddf.select(F.avg(F.col('v'))).show()
# str
df['date_str_yr'] = df['date_str'].str[:4]
df['date_chk'] = pd.to_datetime(df['date_str'],format='%Y-%m-%d')
ddf = ddf.withColumn('date_str_yr', ddf['date_str'].substr(0, 4))
ddf.show()
# add column
df[v2']=df['v']*2
ddf2 = ddf2.withColumn('v2',ddf2['v']*2) # store column# groupbydf.groupby(['g']).size()
ddf.groupby('g').count().show()
df.groupby(['g'])['v'].mean()
ddf.groupby('g').count().show()
df.groupby(['g']).head(1)
ddf.groupby(['g']).agg(F.first("v")).show()# run sql
ddf.createOrReplaceTempView("tablename")
spark.sql("select * from tablename")
Partitions
Write
ddf.write.partitionBy("date_str").mode("overwrite").csv("data/bydate", header=True)
The partition column is not saved in the files, only in the directory name.
Read
spark.read.option("header",True).csv("data/bydate").toPandas()
This automatically adds the partition column to the dataframe.