Schema Evolution in Data Lakes
There are countless articles to be found online debating the pros and cons of data lakes and comparing them to data warehouses. One of the key takeaways from these articles is that data lakes offer a more flexible storage solution. Whereas a data warehouse will need rigid data modeling and definitions, a data lake can store different types and shapes of data. This leads to the often used terms of “schema-on-write” for data warehouses and “schema-on-read” for data lakes. In other words, upon writing data into a data warehouse, a schema for that data needs to be defined. In a data lake, the schema of the data can be inferred when it’s read, providing the aforementioned flexibility. However, this flexibility is a double-edged sword and there are important tradeoffs worth considering.
At SSENSE, our data architecture uses many AWS products. The current iteration of our data lake makes use of Athena, a distributed SQL engine based off of Presto, in order to read data stored in S3. The majority of these files are stored in Parquet format because of its compatibility with both Athena and Glue, which we use for some ETL as well as for its data catalog. One advantage of Parquet is that it’s a highly compressed format that also supports limited schema evolution, that is to say that you can, for example, add columns to your schema without having to rebuild a table as you might with a traditional relational database. However, Parquet is a file format that enforces schemas. If one of the advantages of data lakes is their flexibility and the ability to have “schema-on-read”, then why enforce a schema when writing data?
Before answering this question, let’s consider a sample use-case. In an event-driven microservice architecture, microservices generate JSON type events that will be stored in the data lake, inside of an S3 bucket. This data may then be partitioned by different columns such as time and topic, so that a user wanting to query events for a given topic and date range can simply run a query such as the following:
SELECT * FROM datalake_events.topicA WHERE date>yesterday
Without getting into all the details behind how Athena knows that there is a “table” called topicA
in a “database” called datalake_events
, it is important to note that Athena reads from a managed data catalog to store table definitions and schemas. In our case, this data catalog is managed by Glue, which uses a set of predefined crawlers to read through samples of the data stored on S3 to infer a schema for the data. Athena then attempts to use this schema when reading the data stored on S3.
In our initial experiments with these technologies, much of our data was kept in its raw format, which is JSON for event based data, but for many sources could also be CSV. Here are some issues we encountered with these file types:
Null Field Handling For CSV Files
Consider a comma-separated record with a nullable field called reference_no
. Let us assume that the following file was received yesterday:
id,text,reference_no,date
1,”foo”,,2019-12-29
Now let’s assume that the sample file below is received today, and that it is stored in a separate partition on S3 due to it having a different date:
id,text,reference_no,date
2,”bar”,98765,2019–12–30
With the first file only, Athena and the Glue catalog will infer that the reference_no field is a string given that it is null. However, the second file will have the field inferred as a number. Therefore, when attempting to query this file, users will run into a HIVE_PARTITION_SCHEMA_MISMATCH
error. Essentially, Athena will be unable to infer a schema since it will see the same table with two different partitions, and the same field with different types across those partitions.
Multiple Levels Of Nested Data
Another problem typically encountered is related to nested JSON data. For example, consider the following JSON record:
{
"message": {
"message_id": "B1941592-23A1-1D0D-18Z9-FE9A3E11FE8A",
"message_timestamp": 1571757109
},
"data": {
"id": 12345,
"nested1": {
"n1a": "foo"
}
}
}
When Athena reads this data, it will recognize that we have two top-level fields, message
and data
, and that both of these are struct types (similar to dictionaries in Python). Both of these structs have a particular definition with message
containing two fields, the ID which is a string and the timestamp which is a number. Similarly, the data
field contains ID, which is a number and nested1
, which is also a struct. Now consider the following record received in a different partition:
{
"message": {
"message_id": "B1941512-57A1-1D0C-18D9-FE9A3E11FE8A",
"message_timestamp": 1571857109
},
"data": {
"id": 67890,
"nested1": {
"n1a": "foo",
"n1b": "bar"
}
}
}
The addition of a key/value pair inside of nested1
will also cause a HIVE_PARTITION_SCHEMA_MISMATCH
error because Athena will have no way of knowing that the content of the nested1
struct has changed. Even though both of these columns have the same type, there are still differences which are not supported for more complex data types.
Fixing these issues however, can be done in a fairly straightforward manner. By declaring specific types for these fields, the issue with null columns in a CSV can be avoided. Furthermore, by flattening nested data structures, only top-level fields remain for a record and as mentioned previously, this is something that parquet supports. Flattening the data can be done by appending the names of the columns to each other, resulting in a record resembling the following:
{
"message-message_id": "B1941512-57A1-1D0C-18D9-FE9A3E11FE8A",
"message-message_timestamp": 1571857109,
"data-id": 67890,
"data-nested1-n1a": "foo",
"data-nested1-n1b": "bar"
}
This brings us back to the concept of “schema-on-read”. While conceptually this convention has some merit, its application is not always practical. While upstream complexity may have been eliminated for a data pipeline, that complexity has merely been pushed downstream to the user who will be attempting to query this data. With an expectation that data in the lake is available in a reliable and consistent manner, having errors such as this HIVE_PARTITION_SCHEMA_MISMATCH
appear to an end-user is less than desirable.
Ultimately, this explains some of the reasons why using a file format that enforces schemas is a better compromise than a completely “flexible” environment that allows any type of data, in any format. There can be some level of control and structure gained over the data without all the rigidity that would come with a typical data warehouse technology.
Nevertheless, this does not solve all potential problems either. Other nested complex data types can still pose problems. For example, consider an extended version of the previous JSON record:
{
"message": {
"message_id": "A1941512-57A1-1D0C-18D9-FE9A3E11FE8A",
"message_timestamp": 1571757109
},
"data": {
"id": 12345,
"nested1": {
"n1a": "foo",
"n1b": "bar"
},
"nested2": [
]
}
}
An additional field, nested2
, which is an array-type field has been added. Similar to the examples above, an empty array will be inferred as an array of strings. But perhaps this is an optional field which itself can contain more complicated data structures. For example, an array of numbers, or even an array of structs. The latter case is a troublesome situation that we have run into. It has required some creative problem solving but there are at least three different approaches that can be taken to solve it:
1 — “Stringifying” Arrays
Perhaps the simplest option, and the one we currently make use of, is to encode the array as a JSON string. Therefore, the above field nested2
would no longer be considered an array, but a string containing the array representation of the data. This approach can work with all complex array types and can be implemented with no fuss. The main drawbacks are that users will lose the ability to perform array-like computations via Athena, and downstream transformations will need to convert this string back into an array. However, this can be implemented easily by using a JSON library to read this data back into its proper format (e.g. json.loads()
in Python). This approach also simplifies the notion of flattening, as an array would require additional logic to be flattened compared to a struct.
2 — Complex Flattening
Whereas structs can easily be flattened by appending child fields to their parents, arrays are more complicated to handle. In particular, they may require substantial changes to your data model. Considering the example above, an end-user may have the expectation that there is only a single row associated with a given message_id
. Flattening an array with multiple elements would either involve adding a number of columns with arbitrary names to the end of the record, which would diminish the ability to properly query the data based on known field names, or it would involve adding multiple rows for each element of the array, which could impact logic that aggregates data based on an ID. Although the latter is a viable solution, it adds more complexity and may require a completely separate table to store the array results.
3 — Schema Registry
In theory, this option may be the best in terms of having full control and knowledge of what data is entering the data lake. The approaches listed above assume that those building the pipelines don’t know the exact contents of the data they are working with. They are schema and type agnostic and can handle unknowns. However, if the exact format and schema of messages is known ahead of time, this can be factored into the appropriate data pipeline. There has been work done on this topic, but it also relies on more stringent change management practices across the entirety of an engineering department.
The goal of this article was to provide an overview of some issues that can arise when managing evolving schemas in a data lake. Although the flexibility provided by such a system can be beneficial, it also presents its own challenges. Much research is being done in the field of Data Engineering to attempt to answer these questions, but as of now there are few best practices or conventions that apply to the entirety of the domain. Different technologies can offer different pros and cons that may help with these issues: Avro is a comparable format to Parquet and can also handle some schema evolution. It also has specific files that define schemas which can be used as a basis for a schema registry. Google’s BigQuery is a data warehousing technology that can also store complex and nested data types more readily than many comparable technologies. It is important for data engineers to consider their use cases carefully before choosing a technology. The tools should ultimately serve the use case and not limit it.
Editorial reviews by Deanna Chow, Liela Touré & Prateek Sanyal.
Want to work with us? Click here to see all open positions at SSENSE!