Cleaning data- with PySpark

Samueldavidwinter
3 min readDec 30, 2021

--

Contents

Defining the problem

The impurities we commonly need to fix

The format we commonly need to produce

Dealing with the mess

Replacing values

Reshaping

Testing

Common Impurities

that oft need refining

  • Incorrect data types
  • Invalid rows
  • Incomplete rows
  • Poorly chosen placeholders
  • Poorly named columns
  • Unpermissable shape
  • Grouping and aggregation
  • Joining datasets
  • Ordering results

Permissible data conventions

Before we clean, it is important to know what our clean end-product should look like:

  • When 95 % completion and clean data is permissible

Implicit standards in the company

  • Regional datetimes vs UTC
  • Column naming conventions

Low-level system details

  • Representation of unknown or incomplete data
  • Ranges for numerical values
  • Fields meanings

Dealing with messy data

Defining datatypes whilst reading

schema = StructType([
StructField(‘name’, StringType(), False),
StructField(‘age’, StringType(), False)
])
df = spark.read.format(‘csv’).load(‘datafile’).schema(schema)

Setting an Index Column
df = df.withColumn(‘id’, monotonically_increasing_id())

Changing datatypes After reading

voter_df.withColumn(‘year’, voter_df[‘_c4’].cast(IntegerType()))

Replacing Values

one _year _ from _ now = date.today().replace(year=date.today().year + 1)

better _ frame = employees.withColumn(“end _ date” , when(col(“end _ date”) > one _year _ from _ now, None).otherwise(col(“end _ date”)))

Null Data

df_dropped = df.na.drop()

df_dropped = df.na.drop(sub_set=[‘colName’)

df.witholumn(“CleanColName”, when(df.ColName.isNull(), ‘’unknown’) . otherwise (df.colname)

Filtering Rows

prices _ in _ belgium = prices.filter(col(‘countrycode’) == ‘BE’).orderBy(col(‘date’))

# Detecting when a column has a class without much values

df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()

Out:

|      native_country|count(native_country)|
+--------------------+---------------------+
| Holand-Netherlands| 1|
| Scotland| 12|
| Hungary| 13|

The feature native_country has only one household coming from Netherland. To exclude it:

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Duplicate Values

Detecting and flagging Duplicate Values

3import pyspark.sql.functions as f

df_basket1.join( df_basket1.groupBy(df_basket1.columns).agg((f.count("*")>1).cast("int"). alias("Duplicate_indicator")), on=df_basket1.columns, how="inner").show()

Removing Duplicate Values with groupBy and filter

df1=df_basket1.groupBy("Item_group","Item_name","price").count().filter("count > 1")

4df1.drop('count').show()

Removing Duplicate Values with distinct

prices.select( col(“store”), col(“brand”).alias(“brandname”) ).distinct()

Filtering Columns

voter_df.drop(‘unused_column’)

Buckets

With ml.feature

values = [("a", 23), ("b", 45), ("c", 10), ("d", 60), ("e", 56), ("f", 2), ("g", 25), ("h", 40), ("j", 33)]


df = spark.createDataFrame(values, ["name", "ages"])


from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 6, 18, 60, float('Inf') ],inputCol="ages", outputCol="buckets")
df_buck = bucketizer.setHandleInvalid("keep").transform(df)

df_buck.show()

With df.select

df.select(df.Name, df.Age,
.when(df.Age >= 18, “Adult”)
.otherwise(“Minor”))

Text Data

Lower/Upper Case

With select

From pyspark.sql.functions import lower, col

Syntax: df. df.select(lower(df.colName)).show()

Example: df = df.select(lower(df.first_name)).show()

With sql.functions

import pyspark.sql.functions as F

voter_df.withColumn(‘upper’, F.upper(‘name’)

Extract values containing part of a string

voter_df.where(voter_df[‘_c0’].contains(‘VOTE’))

Splitting Columns

import pyspark.sql.functions as F

voter_df.withColumn(‘splits’, F.split(‘name’, ‘ ‘))

Reshaping

Grouping and aggregating

(prices .groupBy(col(‘brand’)) .agg( avg(‘price’).alias(‘average _price’), count(‘brand’).alias(‘number _ of _ items’) ) ).show()

(prices .groupBy(col(‘brand’)) .agg( avg(‘price’).alias(‘average _price’), count(‘brand’).alias(‘number _ of _ items’) ) ).show()

ratings _ with _prices = ratings.join(prices, [“brand” , “model”])

UDFs

To implement UDFs

1.Define a Python Method

def reverseString(mystr):
return mystr[::-1]

2.Wrap the function and store as a variable

udfReverseString = udf(reverseString, StringType())

3.Call with Spark

user_df = user_df.withColumn(‘ReverseName’,
udfReverseString(user_df.Name))

Pipelines

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]

Testing Data with PySpark

Software has a tendency to change, due to:

  • New functionality
  • Performance improvement
  • Squashing bugs

Core functionality rarely evolves

The rationality behind testing:

  • Improves chance of code being correct in future, by preventing bugs
  • Raises confidence that our code is currently correct
  • Helps provides the most up to date documention

Testing Processes

  • Deciding what to test
  • Writing tests
  • Running the tests

Testing has a high return on investment

  • Target the correct layer
  • Target non-trialparts

Testing tips

  • Create in memory dataframes
  • - Data is in plain sight
  • - External data sources are costly
  • - Allows us to focus on a small number of examples
  • Create small and well named functions

--

--

Samueldavidwinter

Passionate data engineer who loves helping others & =playing a small part in humanities capability to improve lives & understand the glorious universe. Much <3