Apache Spark, Parquet, and Troublesome Nulls

Wesley Hoffman
6 min readNov 28, 2017

--

A hard learned lesson in type safety and assuming too much

Introduction

While migrating an SQL analytic ETL pipeline to a new Apache Spark batch ETL infrastructure for a client, I noticed something peculiar. The infrastructure, as developed, has the notion of nullable DataFrame column schema. At first glance it doesn’t seem that strange. Most, if not all, SQL databases allow columns to be nullable or non-nullable, right? Let’s look into why this seemingly sensible notion is problematic when it comes to creating Spark DataFrames.

from pyspark.sql import typesschema = types.StructType([
types.StructField("index", types.LongType(), False),
types.StructField("long", types.LongType(), True),
])
df = sqlContext.createDataFrame(sc.emptyRDD(), schema)df.printSchema()

This block of code enforces a schema on what will be an empty DataFrame, df. df.printSchema() will provide us with the following:

root  
|-- index: long (nullable = false)
|-- long: long (nullable = true)

It can be seen that the in-memory DataFrame has carried over the nullability of the defined schema. However, this is slightly misleading. Column nullability in Spark is an optimization statement; not an enforcement of object type.

In this post, we will be covering the behavior of creating and saving DataFrames primarily w.r.t Parquet. Parquet file format and design will not be covered in-depth.

What Does Nullable Mean to DataFrame Columns?

…when you define a schema where all columns are declared to not have null values — Spark will not enforce that and will happily let null values into that column. The nullable signal is simply to help Spark SQL optimize for handling that column. If you have null values in columns that should not have null values, you can get an incorrect result or see strange exceptions that can be hard to debug. — The Data Engineer’s Guide to Apache Spark; pg 74

When a column is declared as not having null value, Spark does not enforce this declaration. No matter if the calling-code defined by the user declares nullable or not, Spark will not perform null checks. A column’s nullable characteristic is a contract with the Catalyst Optimizer that null data will not be produced. A healthy practice is to always set it to true if there is any doubt. It makes sense to default to null in instances like JSON/CSV to support more loosely-typed data sources. More importantly, neglecting nullability is a conservative option for Spark. Apache Spark has no control over the data and its storage that is being queried and therefore defaults to a code-safe behavior. For example, files can always be added to a DFS (Distributed File Server) in an ad-hoc manner that would violate any defined data integrity constraints.

DataFrame Creation from Parquet

Creating a DataFrame from a Parquet filepath is easy for the user. It can be done by calling either SparkSession.read.parquet() or SparkSession.read.load('path/to/data.parquet') which instantiates a DataFrameReader .¹ In the process of transforming external data into a DataFrame, the data schema is inferred by Spark and a query plan is devised for the Spark job that ingests the Parquet part-files.

When schema inference is called, a flag is set that answers the question, “should schema from all Parquet part-files be merged?” When multiple Parquet files are given with different schema, they can be merged. The default behavior is to not merge the schema.² The file(s) needed in order to resolve the schema are then distinguished. Spark always tries the summary files first if a merge is not required. In this case, _common_metadata is more preferable than _metadata because it does not contain row group information and could be much smaller for large Parquet files with many row groups. If summary files are not available, the behavior is to fall back to a random part-file.³ In the default case (a schema merge is not marked as necessary), Spark will try any arbitrary _common_metadata file first, falls back to an arbitrary _metadata, and finally to an arbitrary part-file and assume (correctly or incorrectly) the schema are consistent. Once the files dictated for merging are set, the operation is done by a distributed Spark job.⁴ It is important to note that the data schema is always asserted to nullable across-the-board. In short this is because the QueryPlan() recreates the StructType that holds the schema but forces nullability all contained fields.

How Parquet is Written

“…When writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons.” — Spark Docs

So say you’ve found one of the ways around enforcing null at the columnar level inside of your Spark job. Unfortunately, once you write to Parquet, that enforcement is defunct. To describe the SparkSession.write.parquet() at a high level, it creates a DataSource out of the given DataFrame, enacts the default compression given for Parquet, builds out the optimized query, and copies the data with a nullable schema. This can loosely be described as the inverse of the DataFrame creation.

Some Experiments

In this final section, I’m going to present a few example of what to expect of the default behavior.

When investigating a write to Parquet, there are two options:

  1. Use a manually defined schema on an establish DataFrame
schema = types.StructType([
types.StructField("index", types.LongType(), False),
types.StructField("long", types.LongType(), True),
])
data = [
(1, 6),
(2, 7),
(3, None),
(4, 8),
(5, 9)
]
df_w_schema = sqlContext.createDataFrame(data, schema)
df_w_schema.collect()
df_w_schema.write.parquet('nullable_check_w_schema')
df_parquet_w_schema = sqlContext.read.schema(schema).parquet('nullable_check_w_schema')
df_parquet_w_schema.printSchema()

What is being accomplished here is to define a schema along with a dataset. At the point before the write, the schema’s nullability is enforced. But once the DataFrame is written to Parquet, all column nullability flies out the window as one can see with the output of printSchema() from the incoming DataFrame.

root  
|-- index: long (nullable = true)
|-- long: long (nullable = true)

2. No schema is defined

df_wo_schema = sqlContext.createDataFrame(data)
df_wo_schema.collect()
df_wo_schema.write.mode('overwrite').parquet('nullable_check_wo_schema')
df_parquet_wo_schema = sqlContext.read.parquet('nullable_check_wo_schema')
df_parquet_wo_schema.printSchema()

Just as with 1, we define the same dataset but lack the “enforcing” schema. The outcome can be seen as

root  
|-- _1: long (nullable = true)
|-- _2: long (nullable = true)

No matter if a schema is asserted or not, nullability will not be enforced.

Footnotes

[1] The DataFrameReader is an interface between the DataFrame and external storage.

[2] PARQUET_SCHEMA_MERGING_ENABLED: When true, the Parquet data source merges schemas collected from all data files, otherwise the schema is picked from the summary file or a random data file if no summary file is available.

[3] Metadata stored in the summary files are merged from all part-files. However, for user defined key-value metadata (in which we store Spark SQL schema), Parquet does not know how to merge them correctly if a key is associated with different values in separate part-files. When this happens, Parquet stops generating the summary file implying that when a summary file is present, then:

a. Either all part-files have exactly the same Spark SQL schema, or
b. Some part-files don’t contain Spark SQL schema in the key-value metadata at all (thus their schema may differ from each other).

Spark plays the pessimist and takes the second case into account. This means summary files cannot be trusted if users require a merged schema and all part-files must be analyzed to do the merge.

[4] Locality is not taken into consideration. This optimization is primarily useful for the S3 system-of-record. S3 file metadata operations can be slow and locality is not available due to computation restricted from S3 nodes.

The parallelism is limited by the number of files being merged by. Therefore, a SparkSession with a parallelism of 2 that has only a single merge-file, will spin up a Spark job with a single executor.

References

--

--

Wesley Hoffman

Software and Data Engineer that focuses on Apache Spark and cloud infrastructures. Lifelong student and admirer of boats