Separation of metadata and data: Building a cloud native warehouse, Part 3
There have been many discussions in the industry on the separation of compute and storage. In this post, we will do an in-depth exploration on the less discussed separation of metadata and data.
Here at dataxu, to say we handle a lot of data is an understatement. But in our daily dealings with data and its metadata, we began to explore the topic, and potential advantages, of separating metadata from data.
Below, we will review how metadata is handled in a traditional database vs dataxu’s cloud warehouse, discuss design patterns and high-level implementation, and take an in-depth look at the benefits of decoupled metadata. But before we dig in, let’s start at the beginning — what exactly is metadata?
What is metadata?
In the most basic sense, metadata describes the structure of the data, like name for table/column/partition, datatype, and ordering.
In almost all traditional databases, the metadata exists as a catalog. In Oracle, there are catalog views, such as USER, ALL, and DBA views. In Postgres, there are information_schema and pg_catalog. With such catalogs, it is always possible to find answers to the following questions:
- What are the schemas in the database?
- What are the tables and views in each individual schema?
- What are the columns in each individual table, (like data_type, ordinal_position, nullability, etc.)?
- What are the constraints declared in the table?
- What are the partitions in tables?
- What are the UDFs in the database?
In a cloud-based warehouse, like our’s here at dataxu, Hive Metastore provides similar functionality. Either installed as a hive component backed by an aurora database or as a Glue Data Catalog, we can manipulate the catalog via Hive DDL. Beyond the obvious similarities and limitations (for example, Glue Data Catalog does not support views or UDFs yet), there is a significant distinction.
In traditional databases, the metadata and data is tightly coupled. You cannot have data without metadata. The pattern is “create-validate-insert”. Consider a typical database development process:
However, in the cloud warehouse, the metadata and data are separated. Data can exist independently on the S3. This means:
- It is possible to have data but no metadata, i.e., no associated table definition in a data catalog. You might not think this is a very interesting use case. If you don’t have definition, how can you query? However, this point proves to be useful in cases of data corruption handling. We will explore this use case more in depth below.
- It is possible to have data associated with one or more tables. This is interesting. A data set can be defined as a table while at the same time be incorporated as a partition in another table. It all depends on the use case.
- More commonly, we can have data generated first, validated, and then make it available in a data catalog to support additional processing.
Why is this separation of metadata and data crucial? Let’s take a look at two critical and interlinked issues, transaction and data validation.
ACID and transaction are difficult
Database theories would emphasize ACID — atomicity, consistency, isolation, and durability — properties:
Atomicity: requires that each transaction be “all or nothing”, either at single statement level or multi-statement level.
Consistency: ensures that any transaction will bring the database from one valid state to another, the data must satisfy all declared database level constraints.
Isolation: ensures that the concurrent execution of transactions results in a system state that would be obtained if transactions were executed sequentially, most common level is read committed.
Durability: ensures that once a transaction has been committed, it will remain so.
Fortunately, or sadly, depending on your perspective, Consistency is the first ACID property to relax for warehousing and ETL — extract, transform, load — in the name of performance. It is a long established ETL approach to validate constraints in an application, rather than rely on declared database constraints. And in the cloud environment, the remaining AID properties are significantly altered as well.
The traditional approach of locking, or MVCC — Multiversion concurrency control, is hard to implement in a distributed environment. With Atomicity, Hive has so far only supported single statement transaction, i.e., only auto-commit, no support for multi-statement transaction by BEGIN, COMMIT and ROLLBACK.
Similarly for Spark, transaction is left to the application to manage. Consider an ETL application to ensure read committed Isolation, when a writer process is manipulating table T by updating/deleting old records and inserting new records. Not only is an intermediate result immediately available to query users with potential inconsistent data, but it is also hard to clean up if there is an error.
Even Durability is challenging. S3 eventual consistency is very different from a file system, where an OS level flush would be an efficient way to guarantee the data persistence. If a request to put or delete a large amount of data is attempted, S3 may even throttle and throw the dreaded 503 Slow Down response.
In essence, all ACID properties are intrinsically linked to transaction management. Yet transaction is left to the application to manage in a cloud warehouse. Thus, we rely on separation of metadata and data to manage the transaction. The main idea is that while data is written to S3, it is not “committed” until it is made available in a data catalog.
There are 3 common cases where this can occur
- Unpartitioned tables, typically including small dimension tables or fully refreshed aggregate tables. In such tables, we can use the following syntax to set the location.
What does this mean? Suppose we have the current copy of data available for a user to query at location s3url1. We can create a new copy of the data with required modifications to s3url2, make sure the data is valid and then issue the above statement to make the new copy available to the query user, achieving both atomicity and isolation.
- Append-only partitioned table, meaning every batch processing only inserts new partitions to the table, with new unique values for partition keys. This is applicable if each ETL batch has a unique identifier and the batch_id forms part of the partition key. Multiple partitions can be added in a single DDL statement, the syntax is:
- In case no unique ID is available in the partition key, we need to alter the existing partition and add a new partition at the same time. We implemented a simple “merge partition”, similar to merge DML. If the partition doesn’t exist, use the above syntax to insert new partitions and if the partition does exist, use the following syntax to alter existing partition:
For a multi-statement transaction where multiple tables need to be manipulated, in an all-or-nothing fashion, we can generate a data set for all tables, and then issue all hive DDLs at the end in succession. If any operations failed before issuance of the first hive DDL, the EMR step would fail, and the data location will not be made to Data Catalog. This means, cluster failure is transaction rollback. From the query user perspective, the potentially corrupted S3 data does not exist and will be cleaned up by S3 lifecycle policies, no need for manual intervention.
There is a chance that the series of DDL could fail in the middle. But, in reality, we rarely experienced such issues in both Aurora backed metastore or Glue Data Catalog. Should we encounter such an error, it is still much easier to recover from, compared to the messy S3 cleanup. Just identify the affected table or partition and revert to previous s3 location before the first Hive DDL.
Schema-on-read, Pattern: Validate-Insert-Create
Traditional databases follow the pattern of “create-validate-insert”, where tables have to be created first, with all the proper validation, before any data can be inserted. We could follow the same pattern, as shown below, but there could be some surprises
Given the above surprises in our Spark based ETL, we don’t want to follow the typical database “create-validate-insert” pattern. There are two connected issues that are outlined in this example:
- Without transaction guarantee, we could get partial and corrupt data upon DML failures.
- As the underlying storage is S3, with eventual consistency mechanism, the cleanup on partially corrupt data is hard to guarantee in a timely fashion.
Instead of using the typical pattern, at dataxu we have adopted a “validate-insert-create” pattern. The steps to implementing this pattern include:
- Create a spark temporary view matching the target external table schema. We created a helper class for this purpose, which ensures the temporary view has exactly the same number of columns, data types and ordering as the target external table.
- The validated data will be collected in the above temp view and write to S3 directly, without using INSERT INTO, like the following
- Then create/alter either an external table or partition depending on the use case, to point the location to the s3Path referenced above. This step is only executed when the above validation and write passed.
An alternate popular option is Hive’s dynamic partitioning feature, where you can set a root location, and have Hive generate the data and metadata dynamically.
We did not adopt this approach for two reasons:
- With the Hive partitioning scheme, the generated S3 prefixes are all relative to the root location. This means, all the data will have the same prefix. This is in conflict with S3 best practice for performance. The S3 guidance is to append a random alphanumeric prefix to the location to ensure best GET request throughput. For data warehousing, the predominant use case is querying, thus we optimize for GET over PUT.
- Current version of Spark, (actually, Hadoop), treats S3 as a file system. In write operation, Spark creates _temporporay prefix, and then moves them to the final location. If this fails in the middle of the move, we could have partial and corrupt S3 data that is difficult to clean up.
At dataxu, we use Spark to write partitioned data and then add partitions manually. Pseudo-code for partitioned write:
With this approach, each batch can have a random prefix to ensure the best possible query performance.
DDL is the new DML
In a traditional database, we are comfortable with INSERT, UPDATE, and DELETE to perform data modification operations. The UPDATE and DELETE statements are typically harder to write, compared to INSERT and SELECT, and perform much worse. The common cause is, most databases have to maintain before and after images in UPDATE and DELETE, resulting in much larger overhead than INSERT.
One common approach, even in the traditional database realm, is to create a separate table on the side, recreate the data set with INSERT/SELECT, SELECT records to keep and INSERT records to update, and then swap with the current table. I compare this approach to a common technique, double buffering in UI. With the separation of metadata and data, this technique is very easy to achieve without the complexity of managing a second table and it becomes the predominant pattern in ETL chains at dataxu. DDL contains much smaller data volume to manipulate — usually it just involves the table name, partition key/value, and location — the hive metastore API call is efficient compared to the time it would require to generate a large amount of data. The impact to the query user is minimal.
A key concept is that the operation unit is now a table or partition. If we were to update or delete, it must be at an entire table or partition level. If we are to update 5 rows out of 100 million row dataset, we would alter the location of an entire table or partition to the new location for the updated result. In this sense, designing a proper partition key is critical. For example, for an append-only partitioned table, we assign a unique batch ID and ensure the batch ID is part of the partition keys.
Some readers may question the performance on partitioned tables. But there is no need to worry. Hive DDL supports multi-level partitions, and Glue Data Catalog supports 20,000 partitions per table by default and can be raised higher. We had no problem with tables of 5,000 partitions.
Temporary View vs External Table
Another area to consider for metadata is what kind of objects to use to support ETL and querying in the cloud warehouse. In Spark SQL, there are three basic types of “tables”:
- Temporary view: The object only exists in Spark session and Spark catalog. Once the spark session is terminated, the metadata object disappears.
- External table: The table can be persisted into hive metastore. When the external table is deleted from hive metastore, only the table definition is deleted, not the data on S3.
- Managed table: The table is managed by Spark. If the table is deleted, both the table definition (metadata) and data are deleted.
At dataxu, we do not use managed tables. We only have temporary view and external tables in ETL processing. In MPP database, we created many staging tables, or temporary tables, to support ETL, such objects are accessible to query users and can be very confusing. With Spark ETL, such objects are created as temporary views, thus they don’t persist in metastore and query users are completely unaware of their existence. We are able keep a very clean schema, significantly reducing the number of persistent tables in the catalog.
For external tables, we configure Spark connection to Glue catalog with the following two simple EMR configurations.
As Athena uses the same Glue Data Catalog, the metadata updates from Spark ETL are immediately available to Athena query users, tying up both ETL process and querying activities. Worth mentioning, leveraging Glue service yields significant cost savings, dataxu spend on Glue Data catalog is less than 2% of the spend when metadata is hosted on Aurora RDS.
Schema evolution refers to modifications to the table definitions over time. We may need to add new columns, drop old columns, update existing columns for datatype, name, etc. to tables and partitions. Schema evolution in traditional databases typically involves reorganization of the data, leading to locking or performance penalty on a heavily loaded system. In a cloud warehouse, we present two primary ways for schema evolution, to be compatible with separation of metadata and data, as well as query engine behavior, with none of the downsides that exist in traditional databases.
Consider upgrading schema from version 1 to version 2:
- For unpartitioned table, if it is acceptable for query users to experience a short period of outage, we can create the new dataset conforming to version 2, then drop the version 1 table and recreate version 2.
However, this option may not always be acceptable, as there is a brief period of time, usually within a second, that the version 1 is dropped and version 2 has not been created in catalog, resulting query failures. Most of the time, users do not want an interruption in service, i.e., when a table suddenly disappears before reappearing. Which brings us to option two:
- Append new columns only in evolution. Never drop old columns, instead, use alter column data type as an example, create a new column with the new data type and keep the old column.
One of the major concerns with dropping a column is that it would require a re-processing of existing data, to remove the column from the actual data on S3. It is a relatively small effort for dimension tables, but for fact tables with billions of records, this could be a task that is next to impossible. With append-only schema evolution, version 1 data is still compatible with version 2 schema.
As a concrete example, suppose version 1 has column c1 to c100, where c59 is a bigint column. We want version 2 to change the data type to decimal(10, 8). We can alter the table, add c59_v2 of decimal(10, 8). Query engines like Spark, Hive, Presto, Athena can generate null values for c59_v2 with data generated for version 1 and able to project data from both version1 and version 2 in version 2 schema. We can then use coalesce in SQL query to consolidate the two columns:
As of this point, we have discussed the common patterns of separating metadata and data. Now let’s dive into the significant benefits and interesting use cases that are possible with this method.
Removal of DDL or DML locks and data fragmentation
In databases with a locking approach for transactions, writers would block readers. With MVCC, generally readers don’t block writers and writers don’t block readers. Regardless, writers always block writers on the same table. This means, DML operations to a single table is serialized, leading to slow performance. With separation of metadata and data in a cloud warehouse, writers don’t block writers either. It can be huge advantage if managed properly.
Generally, if we ensure write to a table is idempotent, like a unique-id partition for each batch or merge partition, then we can have many writers manipulating the same table at the same time. An interesting use case is when ETL is significantly behind, either due to application bugs or outage from upstream system or AWS, we can launch as many EMR clusters as number of batches in the backlog for concurrent execution and quick catch up. If we have 10 batches, we can be 10x faster than MPP database.
Not only DML can acquire locks, DDL will also acquire locks in a database, leading to slow performance. And similarly, there is no DDL locking in a cloud warehouse, further removing performance bottlenecks.
A common reason for DDL locks is vacuuming. Data can get fragmented in a database, not only for data, but also metadata in the catalog. Different databases have different tools to manage the fragmentation, typically requiring DML and DDL locks in the defragmentation process. Catalog vacuuming could be particularly problematic, typically requiring database outage to query users. In our Spark ETL, we won’t have data fragmentation, as we always recreate the new data set for DML, there is no need for such defragmentation and DML/DDL locks.
Test with real production data
In traditional databases, testing with real production data is always a struggle for performance testing. You can create a second set of equivalent hardware and restore the full database for performance testing, but that’s usually cost-prohibitive and time consuming. Even if you can afford a second set of hardware, developers usually have to make reservations to access such an environment. Alternatively, you can compromise by testing a small percentage of the data, risking unexpected problems in production with insufficient testing coverage.
With the separation of metadata and data, it is easy to test at the full scale of production data, with no compromise. With the production data on S3, all we need to do is make a copy of the metadata catalog and suddenly the same data is available for QA or dev proposes.
Our practice at dataxu is to organize production objects in Hive database, typically, named as “default”. We created a custom process to export all objects, including tables, columns, and partitions of the default database from Glue Data Catalog as JSON, and then restore to a database in the dev environment. The catalog JSON export is a few megabytes and restore takes less than 5 minutes. Every developer can afford to have their own production-sized database or as many as databases to fit their needs in the dev environment. In essence, each database just holds pointers to the massive production S3 data set and we do not need to make a full copy of the S3 data.
Consider this interesting use case: we have an application bug or performance issue, and we would like to test a few potential fixes, in different “what-if” scenarios. We can create several dev databases by restoring the production catalog, try each possible fix at full scale and validate the data correctness or performance improvement before deployment to production.
Another interesting use case is point-in-time recovery. We store the Glue Data Catalog JSON export at an hourly interval. That’s minimal cost on S3, yet, we can restore to any point of time within the past 1 month for testing purposes or disaster recovery, all within 5 minutes.
Boosting performance by separating metadata and data
Traditional databases have metadata and data tightly coupled, leading to problems like performance degradation from catalog locking and data loss due to catalog corruption. With dataxu’s cloud warehouse implementation, we leveraged AWS Glue, specifically, Data Catalog, as the metadata service. In addition to addressing the common problems in traditional databases, we reaped the benefits of development agility as well as full functional and performance testing.
Please post your feedback in the comments — do you have an interesting approach in exploiting separation of metadata and data? In the next post, we will discuss Spark SQL optimization techniques
If you found this post useful, please feel free to “applause” and share!