Cleaning data- with PySpark
Contents
Defining the problem
The impurities we commonly need to fix
The format we commonly need to produce
Dealing with the mess
Replacing values
Reshaping
Testing
Common Impurities
that oft need refining
- Incorrect data types
- Invalid rows
- Incomplete rows
- Poorly chosen placeholders
- Poorly named columns
- Unpermissable shape
- Grouping and aggregation
- Joining datasets
- Ordering results
Permissible data conventions
Before we clean, it is important to know what our clean end-product should look like:
- When 95 % completion and clean data is permissible
Implicit standards in the company
- Regional datetimes vs UTC
- Column naming conventions
Low-level system details
- Representation of unknown or incomplete data
- Ranges for numerical values
- Fields meanings
Dealing with messy data
Defining datatypes whilst reading
schema = StructType([
StructField(‘name’, StringType(), False),
StructField(‘age’, StringType(), False)
])
df = spark.read.format(‘csv’).load(‘datafile’).schema(schema)
Setting an Index Column
df = df.withColumn(‘id’, monotonically_increasing_id())
Changing datatypes After reading
voter_df.withColumn(‘year’, voter_df[‘_c4’].cast(IntegerType()))
Replacing Values
one _year _ from _ now = date.today().replace(year=date.today().year + 1)
better _ frame = employees.withColumn(“end _ date” , when(col(“end _ date”) > one _year _ from _ now, None).otherwise(col(“end _ date”)))
Null Data
df_dropped = df.na.drop()
df_dropped = df.na.drop(sub_set=[‘colName’)
df.witholumn(“CleanColName”, when(df.ColName.isNull(), ‘’unknown’) . otherwise (df.colname)
Filtering Rows
prices _ in _ belgium = prices.filter(col(‘countrycode’) == ‘BE’).orderBy(col(‘date’))
# Detecting when a column has a class without much values
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
Out:
| native_country|count(native_country)|
+--------------------+---------------------+
| Holand-Netherlands| 1|
| Scotland| 12|
| Hungary| 13|
The feature native_country has only one household coming from Netherland. To exclude it:
df_remove = df.filter(df.native_country != 'Holand-Netherlands')
Duplicate Values
Detecting and flagging Duplicate Values
3import
pyspark.sql.functions as f
df_basket1.join( df_basket1.groupBy(df_basket1.columns).agg((f.count("*")>1).cast("int"). alias("Duplicate_indicator")), on=df_basket1.columns, how="inner").show()
Removing Duplicate Values with groupBy and filter
df1=df_basket1.groupBy("Item_group","Item_name","price").count().filter("count > 1")
4df1.drop('count').show()
Removing Duplicate Values with distinct
prices.select( col(“store”), col(“brand”).alias(“brandname”) ).distinct()
Filtering Columns
voter_df.drop(‘unused_column’)
Buckets
With ml.feature
values = [("a", 23), ("b", 45), ("c", 10), ("d", 60), ("e", 56), ("f", 2), ("g", 25), ("h", 40), ("j", 33)]
df = spark.createDataFrame(values, ["name", "ages"])
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 6, 18, 60, float('Inf') ],inputCol="ages", outputCol="buckets")
df_buck = bucketizer.setHandleInvalid("keep").transform(df)
df_buck.show()
With df.select
df.select(df.Name, df.Age,
.when(df.Age >= 18, “Adult”)
.otherwise(“Minor”))
Text Data
Lower/Upper Case
With select
From pyspark.sql.functions import lower, col
Syntax: df. df.select(lower(df.colName)).show()
Example: df = df.select(lower(df.first_name)).show()
With sql.functions
import pyspark.sql.functions as F
voter_df.withColumn(‘upper’, F.upper(‘name’)
Extract values containing part of a string
voter_df.where(voter_df[‘_c0’].contains(‘VOTE’))
Splitting Columns
import pyspark.sql.functions as F
voter_df.withColumn(‘splits’, F.split(‘name’, ‘ ‘))
Reshaping
Grouping and aggregating
(prices .groupBy(col(‘brand’)) .agg( avg(‘price’).alias(‘average _price’), count(‘brand’).alias(‘number _ of _ items’) ) ).show()
(prices .groupBy(col(‘brand’)) .agg( avg(‘price’).alias(‘average _price’), count(‘brand’).alias(‘number _ of _ items’) ) ).show()
ratings _ with _prices = ratings.join(prices, [“brand” , “model”])
UDFs
To implement UDFs
1.Define a Python Method
def reverseString(mystr):
return mystr[::-1]
2.Wrap the function and store as a variable
udfReverseString = udf(reverseString, StringType())
3.Call with Spark
user_df = user_df.withColumn(‘ReverseName’,
udfReverseString(user_df.Name))
Pipelines
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]
Testing Data with PySpark
Software has a tendency to change, due to:
- New functionality
- Performance improvement
- Squashing bugs
Core functionality rarely evolves
The rationality behind testing:
- Improves chance of code being correct in future, by preventing bugs
- Raises confidence that our code is currently correct
- Helps provides the most up to date documention
Testing Processes
- Deciding what to test
- Writing tests
- Running the tests
Testing has a high return on investment
- Target the correct layer
- Target non-trialparts
Testing tips
- Create in memory dataframes
- - Data is in plain sight
- - External data sources are costly
- - Allows us to focus on a small number of examples
- Create small and well named functions