Merging different schemas in Apache Spark

This article explores an approach to merge different schemas using Apache Spark

Thiago Cordon
Data Arena
6 min readDec 21, 2020

--

Photo by Ricardo Gomez Angel on Unsplash

Imagine that you have to work with a lot of files in your data lake and you discover that they don’t have the same schema. Or, to be more tragic, let’s say you have a process that reads data from a data lake and suddenly it stops working. Then, after some time of investigation and pressure from your stakeholders, you discover that the root cause was an unexpected change in the source schema.

Continuous schema-changing becomes a common challenge to data professionals as companies speed-up their deployment cycle to release new features. Although there are tools like Kafka Schema Registry and processes that companies implement to address this challenge, you as a data professional should be prepared to deal with that at any time. More details on how to address it on Kafka Schema Registry in this article.

That said, in this article, I will walk through some attempts to solve this challenge ending in a custom function to accomplish that. You can go directly to the final solution if you want to skip the attempts I’ve made.

The challenge

To simulate schema changes, I created some fictitious data using the library mimesis for Python. The data was generated in Parquet format in the following partitions and each partition has 10 rows and a different schema:

Schema changes by partition — image by author.

The image above is showing the differences in each partition. As we can see, columns and structs were added, datatypes changed and columns were removed.

The desired result is a schema containing a merge of these changes without losing any column or struct even it doesn’t exist anymore.

Attempt 1: Reading all files at once

What happens if we try to read all these files at once with spark.read.parquet()?

data_path = "/home/jovyan/work/data/raw/test_data_parquet"
df = spark.read.parquet(data_path)
df.printSchema()

It’s possible to read all files but as we can see above, only the schema of the first partition was considered. As all partitions have these columns, the read function can read all files but it will not merge the differences as we want.

As we can see below, all rows in each partition can be read because they share the same fields mentioned above but we are interested in merge all fields so, this solution doesn’t work.

from pyspark.sql.functions import col
df.groupBy(col("date")).count().sort(col("date")).show()

Attempt 2: Reading all files at once using mergeSchema option

Apache Spark has a feature to merge schemas on read. This feature is an option when you are reading your files, as shown below:

data_path = "/home/jovyan/work/data/raw/test_data_parquet"
df = spark.read.option("mergeSchema", "true").parquet(data_path)

Unfortunately, this option cannot handle our different schemas. It could not merge the schema of the partition 2020-04-01 because the postal_code has incompatible data types as shown in the following error message (simplified to be legible):

org.apache.spark.SparkException: Failed merging schema of file file:/home/jovyan/work/data/raw/test_data_parquet/date=2020-04-01/part-00000-796d0c3c-69c0-44c5-a4fa-635195e8d6a9.c000.snappy.parquet
Caused by: org.apache.spark.SparkException: Failed to merge fields 'address' and 'address'. Failed to merge fields 'postal_code' and 'postal_code'. Failed to merge incompatible data types string and int

Attempt 3: Reading all files at once forcing a schema on read

Let’s see what happens when we force the desired schema when reading the parquet files.

from pyspark.sql.types import StructType
import json
schema_json = '{"fields":[{"metadata":{},"name":"address","nullable":true,"type":{"fields":[{"metadata":{},"name":"address","nullable":true,"type":"string"},{"metadata":{},"name":"address_details","nullable":true,"type":{"fields":[{"metadata":{},"name":"number","nullable":true,"type":"string"},{"metadata":{},"name":"street","nullable":true,"type":{"fields":[{"metadata":{},"name":"lat","nullable":true,"type":"string"},{"metadata":{},"name":"latitude","nullable":true,"type":"string"},{"metadata":{},"name":"long","nullable":true,"type":"string"},{"metadata":{},"name":"longitude","nullable":true,"type":"string"},{"metadata":{},"name":"street_name","nullable":true,"type":"string"}],"type":"struct"}}],"type":"struct"}},{"metadata":{},"name":"city","nullable":true,"type":"string"},{"metadata":{},"name":"country","nullable":true,"type":"string"},{"metadata":{},"name":"country_code","nullable":true,"type":"string"},{"metadata":{},"name":"postal_code","nullable":true,"type":"string"},{"metadata":{},"name":"state","nullable":true,"type":"string"}],"type":"struct"}},{"metadata":{},"name":"age","nullable":true,"type":"string"},{"metadata":{},"name":"date","nullable":true,"type":"string"},{"metadata":{},"name":"first_name","nullable":true,"type":"string"},{"metadata":{},"name":"identifier","nullable":true,"type":"string"},{"metadata":{},"name":"last_name","nullable":true,"type":"string"},{"metadata":{},"name":"occupation","nullable":true,"type":"string"},{"metadata":{},"name":"title","nullable":true,"type":"string"},{"metadata":{},"name":"title_name","nullable":true,"type":"string"}],"type":"struct"}'schema = StructType.fromJson(json.loads(schema_json))data_path = "/home/jovyan/work/data/raw/test_data_parquet"
df = spark.read.schema(schema).option("mergeSchema", "true").parquet(data_path)
df.show()

When we ask the data frame to return a sample of the lines (df.show()), we get the following error indicating that it could not read the partition 2020-04-01 (error message simplified to be legible).

org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/home/jovyan/work/data/raw/test_data_parquet/date=2020-04-01/part-00000-796d0c3c-69c0-44c5-a4fa-635195e8d6a9.c000.snappy.parquet

This partition has significant changes in the address struct and it can be the reason why Spark could not read it properly.

Attempt 4: Reading each partition at a time and union the dataframes

What if we read each partition at a time and make a union of the dataframes? Let’s see what happens.

import os
from pyspark.sql.functions import lit
data_path = "/home/jovyan/work/data/raw/test_data_parquet"
idx = 0
for dir in [d for d in os.listdir(data_path) if d.find("=") != -1]: df_temp = spark.read.parquet(data_path + "/" + dir).withColumn(dir.split("=")[0], lit(dir.split("=")[1])) if idx == 0:
df = df_temp
else:
df = df.union(df_temp)
idx = idx + 1

We cannot union dataframes with different schemas, so it doesn’t work.

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 7 columns and the second table has 8 columns

Final solution: Custom function

After trying to merge the schemas using the methods described above, I ended up building a custom function that do the following:

  • Reads each partition path and get the schema of that partition
  • Converts the columns to String to assure that the data types will be compatible between schemas avoiding errors faced in attempt 2
  • Converts the dataframe to a JSON RDD before union the partitions. JSON RDD allows the union even when the structures are different, avoiding the error encountered in attempt 4.

After reading the partitions with this custom function, we have a dataframe with the desired schema, containing columns and structures of all partitions.

df.printSchema()

We can also easily identify which columns were not used by doing a count of null rows by partition and column.

# of rows with null by partition and column.

As shown above, only the columns date, age, first_name, identifier, last_name, and occupation were present in all partitions. This explains why in attempt 1 we could read these columns from all partitions.

This solution works also with other file formats like AVRO, which was tested in the complete solution available here. In the complete solution, you can generate and merge schemas for AVRO or PARQUET files and load only incremental partitions — new or modified ones.

Here are some advantages you have using this process:

  • You can use this process to create a merged data lake and consume data from there, avoiding break your process when the schema changes.
  • You can set up automatic tests that actively warn you about anomalies proactively. Examples: check columns not used anymore (columns that became null), test column data types to identify if it can be converted to the desired type.
  • Compare changes among partitions. Examples: since when we are not loading a given column? When a given column was added?
  • Less pressure while you evolve your process because it’ll be running. Some impacts can occur but it tends to happen only in the columns changed, causing less impact in the final product and if you have tests to detect anomalies, you can warn your users proactively.
  • You can continue to deliver the other pieces of information while you fix your process and you can also backfill your data model if you need to recover the period while you were fixing the process.

On the other hand, some disadvantages are:

  • In your merged schema, you lose the real datatype.
  • Although storage is not so expensive nowadays, you will duplicate your data if you are reading from a data lake and writing in another data lake the merged schema.
  • You still have to change your process to adapt it to the changes unless the change is a new column that you don’t need to include in your process.

Final thoughts

In this article, I demonstrated one approach to merge schemas in Apache Spark without losing information.

As described in the beginning, this is not the only way to deal with schema evolution but I hope it can be useful and that it can help somebody that is facing this challenge.

Please, feel free to reach out if you have other ideas on how to solve this. We are stronger together. 😃

If you want to run this solution or explore it in more detail, please, follow the steps documented in the GitHub project readme.

--

--