Pyspark Fundamentals — Practical

ksshravan
11 min readMay 21, 2024

--

This article contains notes for Pyspark based on Manish Kumar YT tutorial . The following topics have been covered

  • Reading files in Pyspark
  • Handling corrupted records
  • Writing Files in Pyspark
  • Partitioning and Bucketing
  • Creating a dataframe
  • Transformations (filtering rows, selecting, adding, removing, renaming, casting columns)
  • Union and union all
  • Writing conditional logic
  • Unique rows and sorting rows
  • Aggregating data (count, sum, min, max, avg)
  • Handling Dates
  • Groupby and Window function
  • Joins

Reading Files

  • General structure for reading file into dataframe
spark.read.format().option("key", "value").schema().load()
  • Format refers to file format which can be csv, json, parquet, jdbc/odbc
  • Option is basically for configuration. Keys can be
    - inferschema : set to true if you want Pyspark to infer schema, false otherwise
    - mode : set to failfast, dropmalformed or permissive (default)
    - header : set to true if data has header, false otherwise (_c0, _c1, _c2 will be column headers if set to false, unless we give our own headers using schema function)
    These are just some of the most important keys, there are many more
  • There are 3 modes in read :
    - failfast : fail execution if corrupted record in dataset
    - dropmalformed : drops corrupted records
    - permissive (default) : keeps corrupted records, but set the value to null
  • To see schema use
df.printSchema()
  • Schema can be defined using 2 ways
    1. Using struct type and structfield
    2. DDL
# Approach 1 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("Id", IntegerType(), True), # nullable True means field can have null values
StructField("Name", StringType(), True),
StructField("Salary", IntegerType(), True)
])

# Approach 2
schema = "id integer, name string, salary integer"

Some of the most commonly used types are IntegerType, StringType, DateType.

  • Note that while all of the previous functions (format, options, schema) are optional while reading load function is mandatory. It contains the file path

Handling corrupted records

  • Example of corrupted data include a json file with a missing bracket or a csv file with an additional comma.
In the example, lines 3 and 4 are corrupted because in line 3. address will be Bangalore and nominee will be India, same for line 4
  • To print bad records, we have to add an additional column called _corrupt_record to our schema
schema = StructType([
StructField("Id", IntegerType(), True), # nullable True means field can have null values
StructField("Name", StringType(), True),
StructField("Salary", IntegerType(), True),
StructField("_corrupt_records", StringType(), True)
])

df = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "PERMISSIVE")\
.option("inferSchema", "false")\
.schema(schema)\
.load(filepath)

# View the corrupted records
df.filter(col("_corrupt_records").isNull()).show()
  • To store the bad records, just add following line while reading file (remember that mode must be permissive)
# can be any path, files will be stored within folder in json format
.option("badRecordsPath", "/FileStore/tables/bad_records")

Writing Files

  • General structure for writing dataframe
df.write.format().option().partitionBy().bucketBy().save()
  • Option is for configuration. Keys can be
    - path : location to save the output
    - mode: set to append, overwrite, errorIfExists, ignore
    - header
  • There are 4 modes in write
    - append : Append the new data to existing data
    - overwrite: Overwrite existing data with new data
    - errorIfExists : if file to be written is already present (i.e. if filepath is same), it throws an error, if file not present then write the file
    - ignore : if file to be written is already present, it does nothing, if file not present then write the file

Partitioning and Bucketing

  • Partitioning means splitting data into multiple smaller datasets (do not confuse with how spark divides data into small chunks)
  • If there is a column with a few categories, then we can partition on the column. If there is a numerical column, we can bucket it (there are more nuances to when to use partitioning vs bucketing, skipping it for now)
df.write.format("csv")\
.option("header", "true")\
.option("mode", "OVERWRITE")\
.option("path", "/Filestore/tables/partition_by_country_gender")
.partitionBy("country", "gender")\ # order of columns important in nested partition
.save()


#dbuntils.fs.ls("dbfs:/Filestore/tables/partition_by_country_gender")
  • By using partition, we can make search times faster. For example, select * from table where country = ‘India’, spark will search only India partition.
  • If the cardinality of column is high, such as id column, we do not use partitioning as it creates too many small sized partitions. Choice between partitionBy and bucketBy can be reduced to determine data cardinality:
    - Low cardinality -> partition
    - Hight cardinality -> bucket
  • bucketBy cannot be used with save(), we have to use saveAsTable(). The table is created in hive metastore

Dataframe Transformations

  • Dataframe transformations include
    - Filtering rows
    - Selecting columns
    - Adding/removing/renaming/casting column
  • All transformations can be done in 3 ways
    1. Using DataFrame API
    2. Using Spark SQL
    3. Using RDDs
    RDDs are the fastest but not intuitive, hence not used unless absolutely necessary. Advantage of using DataFrame API over Spark SQL is in Spark SQL string queries, you won’t know a syntax error until runtime (which could be costly), whereas in DataFrames syntax errors can be caught at compile time. Advantage of Spark SQL over DataFrame API is code reusability, especially when using anoter tool like dbt.
  • To run a Spark SQL query against a dataframe, we must create a temporary view first
# create temp view
df.createOrReplaceTempView("employee_tbl")

# run sql against the view
spark.sql("""
select name, age, address as emp_address
from employee_tbl
"""
)

Note that view is lazily evaluated and that you can then use it like a hive table in Spark SQL. It does not persist to memory unless you cache the dataset that underpins the view. The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.

For the transformations below, both approaches will be shown

  • Creating a Dataframe from scratch
my_data = [("Rahul", 25), ("Rohan", 27)]
my_schema = ["name", "age"]

df = spark.createDataFrame(data=my_data, schema=my_schema)
  • Filtering rows using where function
# Approach 1
from pyspark.sql.functions import col

df.where((col("age") > 18) & (col("age") < 25))
df.filter((col("age") > 18) & (col("age") < 25))

# Approach 2
spark.sql("""
select * from employee_tbl
where age > 18 and age < 25
"""
)
  • Selecting columns can be done in multiple ways using select function
# Choosing Multiple columns
# Using col is useful when we want to add additional transformations
df.select("name","age").show()
df.select(col("name"), col("age")).show()

# 5 different ways of selecting columns
df.select("id", col("name"), df["age"], df.salary, expr("address as emp_address"))

As can be seen above, expr can be used to execute SQL-like expressions

  • Adding/removing/renaming columns
### Approach 1
#Adding column
#If columns already exists, withColumn overwrites it
df.withColumn("Surname", lit("Singh"))
df.select(*, lit("Singh").alias("Surname"))

#Removing column
df.drop(col("Surname"))

#Renaming column
df.withColumnRenamed("Surname", "Singh_Surname")

#Casting column
df.withColumn("Surname", col("Surname").cast(StringType()))

### Approach 2
# Adding column
spark.sql("""
select *, "Singh" as Surname
from employee_tbl
"""
)

# Renaming column
spark.sql("""
select *, Surname as Singh_Surname
from employee_tbl
"""
)

# Casting column
spark.sql("""
select *, cast(Surname as String)
from employee_tbl
"""
)

# Use printSchema function to see data type
  • Make sure to use cast whenever required such as while performing joins or creating a new column. Casting as integer is one of the most common, as many times while performing operations like division the output is a floating point which we then have to convert to int (check date section below)

Union and union all

  • In SQL, union removes any duplicate records whereas union all keeps duplicate records. In Pyspark dataframe however, union and union all are the same. If we want to drop duplicates like in SQL, we can use drop_duplicates function
## Union and keep all duplicates
# Approach 1
employee_df1.union(employee_df2)
employee_df1.unionAll(employee_df2)

# Approach 2
spark.sql("""
select * from
employee_tbl1
union all
employee_tbl2
"""
)

## Union and remove all duplicates
# Approach 1
employee_df1.union(employee_df2).drop_duplicates()

# Approach 2
spark.sql("""
select * from
employee_tbl1
union
employee_tbl2
"""
)
  • In real world order of columns need not be same. Spark does not know that and would just join based on column position rather than based on column name. If we want Spark to join based on column names we must use unionByName
## Add example to show how incorrect union happens when order of columns is diff
employees1 = [("Rohan", 24), ("Rohit", 34)]
schema1 = ["name", "age"]
df1 = spark.createDataFrame(data=employees1, schema=schema1)

employees2 = [(43, "Sahil"), (37, "Virat")]
schema2 = ["age", "name"]
df2 = spark.createDataFrame(data=employees2, schema=schema2)

df1.union(df2).show()

df1.unionByName(df2).show()
  • Number of columns must be same, else we get error
  • TO CHECK: If types of columns must be same or not, also how to achieve unionByName in SQL, is there any special union which allows different number of columns, what if no column name match in unionByName

Conditional Logic

  • To write conditional logic we use CASE WHEN in SQL and WHEN OTHERWISE in DataFrame API
### Categorize based on age, replace null ages with 25

# Approach 1
df.withColumn("age", when(col("age").isNull(), lit(25)).otherwise(col("age"))) \
.withColumn("age_category", when((col("age") > 0) & (col("age") < 18), "Minor")
.when(col("age") > 60, "Senior")
.otherwise("Major")
)


# Approach 2
spark.sql("""
select *,
case when age < 18 then "Minor"
when age > 60 then "Senior"
else "Major" end
as age_category
from employee_tbl

"""
)
  • When using multiple conditions within where do ensure to wrap each condition in parenthesis as & in Python has a higher precedence than == so expression has to be parenthesized.
  • When replacing/imputing null values do not forget the otherwise function
  • By using col, we can check if the column is null or not, can we do the same with col function?

Unique rows and Sorting rows

  • To find unique rows, we use distinct() function. By default 2 rows are considered same if values match across all columns. If we want to find distinct rows based on subset of columns, we must use select before distinct
  • To drop duplicate, use drop_duplicates function. Same logic as above if we want to drop based on a subset of columns
# View distinct rows
df.select("name").distinct()

# Keep only unique/distinct rows
df.drop_duplicates("name").show() # drop duplicates based on just name column

Note that dataframes are immutable, so we need to assign the output of transformation to a new variable

  • Sorting can be done as follows
df.sort("sal")

df.sort(col("sal").desc())

Aggregating data

  • Aggregation means collecting items together and summarizing them using a single value
  • Pyspark supports multiple aggregation functions like count, sum, min, max, avg as shown below
emp_df.select(max(col("salary")).cast(IntegerType()).alias("maximum_salary"),
min(col("salary")).cast(IntegerType()).alias("minimum_salary")
)
  • Count acts both as an action as well as a transformation, based on how it is used
    - count() as action : df.count()
    - count() as transformation : df.select(count(“column_name”).alias(“count_of_column”))
  • If you perform count based on a single column it skips null values

Handling Dates

  • Some of the functionalities Pyspark provides for dates include:
    - Parsing date from string column (to_date, unix_timestamp)
    - Getting current date (current_time)
    - Performing date calculations like add/subtract days, find difference b/w 2 dates (date_add, date_sub, datediff, months_between)
    - Change date format as per our requirements (date_format)
  • The usage of the above functionalities is shown in example below
# Consider the Employee dataset with column HireDate. The HireDate has
# 2 formats MM-dd-yyyy and MM/dd/yyyy and is of String type.
# The goal is to find the Experience of the employee as of today in years

from pyspark.sql.functions import *
from pyspark.sql.types import *

df = df.withColumn("HireDate", coalesce(
to_date(col("Hire Date"),"MM-dd-yyyy"),
to_date(col("Hire Date"),"M/dd/yyyy")
))

df.select(date_format(current_date(), "yyyy-MM-dd").alias("Today")).show()

df = df.withColumn("Experience", (months_between(
current_date(),
col("HireDate")
) / 12).cast('int'))


  • Use datediff to find difference in days. If the goal is to find difference in months or years, months_between is a better choice as it is easier to calculate number of years from months rather than days
  • To get year or month, we can use corresponding functions
from pyspark.sql.functions import year, month
df.groupBy(month("eventdate")).count()

Group by and Window Function

  • Group by vs Window: Suppose we have a employee table with id, name, salary and department. And want to answer the following questions
    1. What is the total salary given department wise?
    2. Out of total department salary, find percentage given to each employee
    In case 1, we use a group by. In case 2, we cannot use group by as we still need each row information, so we simulate group by using window function
  • General syntax for group by
# Approach 1
df.groupBy("department", "country").agg(sum("salary")).show()
  • partitionBy is similar to groupBy, except that after calculating, actual grouping does not take place
  • Window: Subset of data using which the aggregation is computed
When we partitionBy dept, each window is nothing but collection of rows belonging to the same dept. For example all rows belonging to IT form one window, all rows belonging to marketing form one window and so on
  • General syntax for windowing is
from pysparl.sql.window import Window

window = Window.partitionBy("dept").orderBy("salary")

df = df.withColumn("total_dept_salary", sum(col("salary")).over(window))

# Since we are doing sum, orderBy is not required, but order becomes important
# when we use functions like rank, dense_rank and row_number
  • orderBy, when used with window, only sorts records within that window before computing the value

Joins

  • Joins are wide dependency transformation (refer theory notes), hence an expensive operation as data has to ben shuffled between executoes
  • Syntax of join statement

df1.join(df2, sales_df["customer_id"] == customer_df["customer_id"], 'inner')
  • Join needs 3 arguments
    1. Dataset/table we want to join to
    2. Join condition
    3. Type of join
  • Pyspark supports the following join types
    1. “inner” (default)
    2. “left” or “leftouter”
    3. “right” or “rightouter”
    4. “outer” or “full” or “fullouter”
    5. “leftanti” :
    6. “leftsemi” : inner join with only columns of the left table selected
    7. cross join (since cross join does not have condition syntax is different)

Note : Add more details on leftanti and its usecase

  • While performing a join, if the 2 columns involved are of string type make sure the case is the same (both lowercase or both uppercase). For any other data type, ensure that both the columns have the same data type and if not sure, use cast function to ensure types are same

Interview Questions

  1. How to create schema in Pyspark? How many ways to create it?
  2. What is structfield and structtype in schema?
  3. Suppose the data has headers, but we do not want to use those headers and define a schema of our own, how do we deal with this scenario?
  4. What are the modes available in the dataframe writer?
  5. Have you worked with corrupted record?
  6. When do you say data is corrupted, with an example?
  7. What happens to corrupted records in each of the different read modes? (this question can be asked as a case study where are a dataset is given and count of records is asked for each of the read modes)
  8. How can we view the bad records?
  9. Where do we store bad records, and how can we access them later?
  10. What is partititionBy and bucketBy?
  11. How to write data into multiple partition?
  12. What is partitioning in spark?
  13. What is bucketing in spark?
  14. Why do we need partitioning and bucketing?
  15. When to use partitioning and bucketing?
  16. How to select columns?
  17. What is the difference b/w union and union all?
  18. What will happen if the number of columns in the 2 datasets to be unioned is different?
  19. What will happen if the column names across 2 datasets are different?
  20. How to ensure columns match when column names are different?
  21. How do we write conditional logic in Pyspark?
  22. How to deal with null values?
  23. How to write multiple and, or conditions?
  24. How to find unique rows?
  25. How to drop duplicate rows?
  26. How to sort data in ascending or descending order?
  27. What are window functions?
  28. What is the difference between row number, rank and dense rank?
  29. Write SQL code to get top 2 earners for each department.

References

  1. https://stackoverflow.com/questions/47036486/what-is-prefer-bucket-or-repartition
  2. https://stackoverflow.com/questions/39917075/pyspark-structfield-false-always-returns-nullable-true-instead-of
  3. https://semanticinsight.wordpress.com/2022/04/10/quirky-things-in-spark-databricks-_corrupt_records/
  4. https://stackoverflow.com/questions/44011846/how-does-createorreplacetempview-work-in-spark
  5. https://stackoverflow.com/questions/45430816/writing-sql-vs-using-dataframe-apis-in-spark-sql
  6. https://stackoverflow.com/questions/32284620/how-to-change-a-dataframe-column-from-string-type-to-double-type-in-pyspark
  7. https://stackoverflow.com/questions/18528468/what-is-the-difference-between-ifnull-and-coalesce-in-mysql
  8. https://stackoverflow.com/questions/2309943/unioning-two-tables-with-different-number-of-columns
  9. https://stackoverflow.com/questions/37707305/pyspark-multiple-conditions-in-when-clause
  10. https://stackoverflow.com/questions/62148704/date-difference-in-years-in-pyspark-dataframe
  11. https://stackoverflow.com/questions/46594750/cast-column-containing-multiple-string-date-formats-to-datetime-in-spark

--

--