The most useful PySpark Function

Andrew Gross
YipitData Engineering
2 min readJul 8, 2020

If you have spent any amount of time working with data at a level lower than “table”, chances are you have had to figure out why it didn’t load correctly.

Sometimes it’s digging into the middle of the CSV files to find the JSON string field that broke the parser. Other times it is fighting to understand why the Parquet column types no longer match.

How do you debug this? You’ll likely spend a lot of time slicing the data down to smallest chunk of “bad” data. Next, you stare into the abyss and think really hard about how it got there. Hopefully you end up at the right answer.

There’s a better way: input_file_name()

As soon as you are working with datasets that span multiple files or multiple stages, you should include this column on every data set you load.

It is most useful when you embed it in your source tables, and ensure that it is propagated through the pipeline so it is easy to look at later tables and quickly identify where certain records originated. Internally we use the following snippets for ingesting CSV and Parquet files (docs strings excluded for brevity).

def read_parquet(path):
from pyspark.sql import functions as F
spark = get_spark_session()
df = (
spark.read.parquet(path)
.withColumn("date_ingested", F.current_timestamp())
.withColumn("input_file_name", F.input_file_name())
)
return normalize_column_names(df)

Note the addition of the ingestion timestamp with date_ingested and column name normalization via normalize_column_names . The latter is primarily to simplify usage with SQL and ensuring safe writes out to Parquet or Parquet based formats.

def read_csv(
path, schema=None, header=True, sep=",", quote='"', escape="\\", infer_schema=True
):
from pyspark.sql import functions as F
spark = get_spark_session()
df = (
spark.read.csv(
path=path,
schema=schema,
header=header,
sep=sep,
quote=quote,
escape=escape,
inferSchema=infer_schema,
)
.withColumn("date_ingested", F.current_timestamp())
.withColumn("input_file_name", F.input_file_name())
)
return normalize_column_names(df)

At some point we will probably end up with enough of these metadata columns that it would be helpful to have a way to hide them by default. Until that time we will deal with the clutter.

Thanks to Robert Muldoon and Anup Segu for the code and reviewing this post.

--

--