Migrating from Hive to Delta Lake + Hive in Hybrid Cloud Environment

Yishuang LU
Amobee Engineering
Published in
9 min readJan 10, 2020

Background

Amobee is a leading independent advertising platform that unifies all advertising channels — including TV, programmatic and social. The Platform Data Team is building a data lake that can help customers extract insights from data easily. One of the challenges we meet is the data volume. Each table can vary from TB to PB. The table can have tens to hundreds of columns. The data source can be first-party/third-party.

To make our data ingestion more scalable and to separate concerns, we have built a generalized data pipeline service on top of Spark and an independent data catalog service on top of Hive. The data catalog service is a centralized place to manage the table metadata, including schema, partitions, etc.

This design works well for many situations. But we also found that the catalog service had a hard time dealing with some big and wide tables. After some research, we found that the limitation comes from Hive.

Challenge

While building a data lake in a hybrid cloud environment, it is important to design the architecture as scalable, pluggable, recoverable, replay-able.

Data can have different views in each geo partition. There can be a case when we want one cluster to be a subset of another one, or another case where the two geo partitions are totally in sync. That is why we built a centralized data catalog service to manage different status across different clusters. Hive was chosen to be the foundation of the catalog service.

Hive is an industry standard. It offers a centralized place to store the metadata for all the data sources. Developers can use well-designed APIs to interact with Hive.

Our pipeline service performs the ETL to transform the data into efficient formats for querying, and we use our catalog service to communicate with Hive and update the schema and partitions accordingly. All the tables are created as external tables in Hive. But there are some limitations in Hive for our case:

  • Managing table at the partition level: Unable to track the changes inside a partition. Ex. no idea what file is inserted into or deleted from a partition at any specific time
  • DDL (Data Definition Language) is slow for big and wide table: Takes a long time to alter schema/drop/MSCK on tables with hundreds of thousands of partitions and hundreds of columns.
  • Generating data with schema that is not backward compatible: Change column data type from timestamp to string in the pipeline
  • DML (Data Manipulation Language) on table is not well supported: Doesn’t natively support DELETES/UPDATES/UPSERTS operation
  • Hard to maintain consistent status cross data center: Discrepancy between files in the table between data center

Delta Lake

Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads.

The metadata in Delta Lake is part of the ‘data’. It coexists with data and stores the status of data. Delta Lake store those status inside the transaction log.

On top of this transaction log file, Delta Lake has introduced many best practices of building a modern data lake. The features listed on the Delta Lake documentation include:

ACID Transactions: Data lakes typically have multiple data pipelines reading and writing data concurrently, and data engineers have to implement a tedious process to ensure data integrity due to the lack of transactions. Delta Lake brings ACID transactions to data lakes. It provides serializability, the strongest level of isolation level.

Scalable Metadata Handling: In big data, even the metadata itself can be “big data”. Delta Lake treats metadata just like data, leveraging Spark’s distributed processing power to handle all its metadata. As a result, Delta Lake can handle petabyte-scale tables with billions of partitions and files at ease.

Time Travel (data versioning): Delta Lake provides snapshots of data enabling developers to access and revert to earlier versions of data for audits, rollbacks or to reproduce experiments.

Open Format: All data in Delta Lake is stored in Apache Parquet format enabling Delta Lake to leverage the efficient compression and encoding schemes that are native to Parquet.

Unified Batch and Streaming Source and Sink: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box.

Schema Enforcement: Delta Lake provides the ability to specify your schema and enforce it. This helps ensure that the data types are correct and required columns are present, preventing bad data from causing data corruption.

Schema Evolution: Big data is continuously changing. Delta Lake enables you to make changes to a table schema that can be applied automatically, without the need for cumbersome DDL.

Audit History: Delta Lake transaction log records details about every change made to data providing a full audit trail of the changes.

Updates and Deletes: Delta Lake supports Scala / Java APIs to merge, update and delete datasets. This allows you to easily comply with GDPR and CCPA and also simplifies use cases like change data capture.

100% Compatible with Apache Spark API: Developers can use Delta Lake with their existing data pipelines with minimal change as it is fully compatible with Spark, the commonly used big data processing engine.

Most importantly: No Vendor Lock-in. Delta Lake itself doesn’t change the data you stored in Hive. You can easily remove the metadata and convert it back into a Hive table, so there is minimal risk.

Some of the good features listed on the website but not in the open source version include statistics and data skipping.

All that being said, we still need Hive, because it brings in many features that Delta Lake doesn’t have, like getting a table schema from outside of Spark job, or storing key-value properties in the Hive table.

Gap and Implementation

To migrate from Hive to Hive+Delta, there are some features missing after we performed a comparison between our pipeline and a Delta-based pipeline.

1. No easy way to update transaction log

Hive offers JDBC connections that can let developers run DDL and DML easily. It also offers a pretty comprehensive API to get the schema of existing tables and make it easy to run DDL when it is required. Our catalog service can update the table schema according to the data schema of pipeline output. However, with Delta Lake, we can’t get the schema directly, because the latest schema is stored in transaction logs. Instead, you need to use the Spark API to read it. It will be expensive to launch a Spark job for fetching the schema from existing tables and run DDL.

After researching some open source frameworks, Livy becomes the new option for us. We can retrieve schemas or run DDL in Spark by making a HTTP request to Livy server. Livy can reuse the open Spark session, so we don’t need to pay for the cost of starting a new Spark job, thus reducing the time it takes to access the schema. More importantly, the catalog service can run any DDL and DML outside of the Spark environment. This reduces the risk of changing our existing framework and enables more flexibility of using the data lake.

2. Little control over output file names

We would like to control the output file names so that for one immutable input set, we know exactly what the output files will be. That way, if we need to re-run the job, the previous output is overwritten. If we directly write out data using the Delta Lake API from our Spark job, then Delta Lake will always insert new files.

To make our pipeline service re-playable, it will write out to an intermediate location, then move into the final destination. This can give us more flexibility to manage our pipeline output files. In order to register these files when not using the Delta Lake save API, we leverage some lower level Delta Lake API to commit files by ourselves. Each data pipeline task can report its output files to the catalog service. The catalog service is responsible for inserting the files into transaction log and update the schema accordingly.

3. Faster table conversion in cloud environment

After Delta 0.4.0, Delta Lake introduced an API to easily convert an existing table into Delta format. The performance is acceptable for small tables, but it works pretty poorly when dealing with big tables on Google Cloud Platform (GCP).

To improve the performance of table conversion, we implemented our own tool for converting data into a Delta Lake table. The tool, similar to Hive, will first define the schema of Delta table, followed by a call to our implementation of MSCK for Delta Lake.

MSCK is used to recover the partitions that exist on the file system, but not registered in the metadata. The idea is to find out the difference by scanning the file system and compare with what exists in the transaction log.

Delta lake itself has very good API to get all available files inside the transaction logs. We have built a scalable way to scan files on any file system. Combining these two, the catalog service can report the ‘untracked’ file to Delta lake through MSCK API.

MSCK works pretty well, especially when we convert Delta format tables on the cloud. It only takes 2.5% of the time compared with the native table conversion API on GCP environment. It also performs better than the original API in our on-premise environment.

4. New Architecture

The original architecture without Delta Lake does a good job in separating the concerns of each component. The main changes of migration will go into data catalog service. To support better interaction with Spark, we introduce Livy server into our architecture:

5. Other Optimizations

Slow to get table properties: Delta allows for table properties, but it needs to be accessed through a Spark job. With Hive, we can directly get the table location and schema information using a Hive client. To address this, we store the properties information into Hive Metastore for easier fetching. The catalog service is responsible to update the transaction log first and store those properties back into the Hive Metastore accordingly.

Lack of roll back support: In the open source version, DELETES/UPDATES/UPSERTS are supported in Delta format, but the roll back feature is missing. If there is something going wrong, we want to have a way to perform a 1-click recovery. The VACUUM command is executed lazily. We developed a feature to let the catalog service scan the log and decide what files need to be inserted or removed between 2 commits before vacuum happens.

Performance Comparison

The prototype shows that there are no performance degradations in the query. After testing on production size data, Delta Lake works better. Here are the relative performance metrics we obtained.

  1. Create table: (Hive table + msck) vs. Delta conversion vs. (Delta update schema + msck)
  2. Alter table: alter Hive column vs. update Delta schema
  3. Drop table: drop Hive table vs. drop Delta table
  4. Spark SQL on big and wide table

On-prem

Cloud

Conclusion

The original Delta API is easy to use, but it doesn’t fit into our architecture. The performance is especially bad in the cloud environment. The API we built on top of Delta API performs well in a hybrid cloud environment. With these changes, we are able to manage the data lake in different hybrid cloud environments easily.

Acknowledgments: Huge thanks to Alvin Chyan, Vinay Gulani, Cewei Cui, Yibo Yao, Larry Lo and Jin-chen Cheng

Reference

  1. Delta Lake

--

--