Airbnb’s data infrastructure has been an essential part of our strategy to continuously improve our products. Our Hive data warehouse grew exponentially from 350TB in the middle of 2013 to 11PB by the end of 2015. As the company grew, the demands on the reliability of the warehouse grew as well, and we sought to migrate our warehouse to new architectures. We found that existing migration tools either had issues with a large data warehouse or had significant operational overhead, so we developed ReAir to save time and effort when replicating data at this scale. In this blog post, we’ll cover details about how ReAir works and how it can be used to easily replicate petabyte-scale data warehouses.
Initially, all of our data was in a single HDFS / Hive warehouse. The single namespace provided a simple mental model and was easy to manage. However, mixing production with ad hoc workloads adversely affected reliability. Consequently, we aimed to split our warehouse into two — one for critical production jobs and another for ad hoc queries. In making this split, we had to handle two challenges — how can we easily migrate such a large warehouse, and after the split, how do we keep datasets in sync? We’ve created ReAir to address these challenges and are open sourcing the tool for the community.
Check it out on github: https://github.com/airbnb/reair
ReAir is useful for replicating data warehouses based on the Hive metastore. We’ve made these tools to be scalable to clusters that are petabytes in size. Using ReAir with your cluster is simple as well — it connects to the Hive metastore Thrift service and copies data using map-reduce jobs. Because these are the only requirements, ReAir can work across a variety of different Hive and Hadoop versions and can operate largely standalone — only a deployment of Hadoop and Hive is required (+MySQL DB for incremental replication). Since ReAir handles both data and metadata, tables and partitions are ready to query as soon as the process completes.
Without ReAir, the typical solution for migrating the warehouse involves launching DistCp and managing metadata manually though database operations. Such an approach is labor intensive and error prone. Some of the errors can introduce inconsistencies that are difficult to resolve. Also, this process has issues with copying constantly changing warehouse directories. Other replication solutions require specific Hive versions — a requirement that’s hard to meet if migrating from an older deployment.
ReAir includes 2 different replication tools: batch and incremental. The batch replication tool allows you to copy a specific list of tables at once, which is ideal for a cluster migration. In contrast, the incremental replication tool allows you to track changes that occur in the warehouse and replicate the objects as they are generated or modified. The incremental mode is ideal for keeping datasets in sync between clusters as it start copying changes within seconds of object creation. More information on the two modes is presented in the following sections.
The batch replication mode is typically used to copy the entire data warehouse. The speed and throughput of the copy depend on the number and the throughput of the reducers, but here at Airbnb, we’ve been able to copy 2.2PB using 500 reducers in 24 hours.
Starting the batch replication process is simple: the user runs a shell command that launches a series of map-reduce (MR) jobs. The execution contract is to replicate entities (Hive tables and partitions) from the source warehouse to the destination warehouse. The batch replication process is bandwidth efficient when run multiple times in succession — it detects files that match between the source and the destination and only copies the differences. Likewise, the metastore is only updated when needed. This idempotent behavior ensures that the tool is easy to run without wasted work.
There are several challenges for batch replication. One of the significant ones is that, in a production data warehouse, the sizes of the entities are not uniform, but the replication latency should not depend on the largest one. For example, common tables can have <100 partitions, while the largest tables can have over 100,000. To keep latency in check, it’s necessary to evenly distribute the replication work.
To solve the load balancing issues, batch replication runs as a series of MR jobs. The two most expensive operations in batch replication are files copies and metadata updates, so those steps are distributed via a shuffle phase. The three jobs generate the list of files to copy, execute the file copy, and lastly, execute the metadata update. Each of these jobs generates logging data as files on HDFS, so it’s easy to examine what happened after the job finishes.
In the first MR job, entity identifiers are read from HDFS and shuffled evenly to reducers. The reducers run various checks on the entities and produce a mapping from entity to the HDFS directory that needs to be copied. The second MR job goes through the list of directories generated by the first job and creates a list of files in those directories. The file names are shuffled to reducers based on hash of file path. Once shuffled, the reducers execute the copy. The third MR job handles the commit logic for the Hive metastore. Since the list of entities was already evenly distributed in the first MR job, the third MR job can be map-only.
The three-stage MR plan scales and balances load well: copying 2.2PB with 1 million entities takes about 24 hours. An update run after 20 TB has changed takes just an hour. During stage 1 and stage 3, we observed that the bottleneck is the Hive metastore MySQL database, while in the file copy of stage 2, the bottleneck is available network bandwidth.
For our migration, we also had to write a custom file copy job to handle our HDFS data. While tools like DistCp are commonly used, we found several issues during testing:
- Job initialization on a directory with millions of files or the entire warehouse is slow.
- The error rate can be high, and we wanted custom error-handling.
- Easy-to-analyze logging is missing.
To solve these problems, we developed a series of two MR jobs to handle general HDFS copies. The first job builds a list of splits using a heuristic, level-based directory traversal and multiple threads. Once there are enough directories, mappers traverse those directories to generate the file list for copying. The file names are shuffled to reducers to determine whether a copy is necessary. The second MR job reads the list of files to copy and distributes the copy work via a shuffle.
With two clusters in place, we had a need to share data between the two clusters. For example, the daily log data was aggregated in the production cluster, but ad hoc users would need the data as well. The batch replication job mentioned above was a great tool for migrations, but it could take an hour to figure out the differences between two warehouses of our size and execute an update. Since users of the ad hoc cluster want data as soon as it’s ready on the production cluster, it made sense to build an even faster method for landing the new content. There were some efforts in open source to handle this, but due to the dependencies on Hive versions, the need to copy S3 backed partitions, and the desire for a standalone solution that follows the same logic as batch replication, we developed the incremental replication tool to keep our production cluster in sync.
As the name suggests, the incremental replication tool involves recording changes to entities and copying over changes as soon as they occur. To record the changes on the source (i.e. production) cluster, we used the hooks mechanism in Hive to write an entry to a MySQL DB whenever a query succeeded. In this way, we could track all the changes that occurred in the production cluster. For changes made directly on HDFS or through systems like Spark, a metadata update (e.g. with a TOUCH command) can be made to trigger replication.
Once we had the changes recorded in the DB, we needed a way replicate those changes to the ad hoc cluster. This execution was implemented through a standalone Java service. The process reads entries from the change log and then convert the entries into a set of actions. For example, a successful create table query on the source cluster would translate to a “copy table” action on the destination. The Java process would use make appropriate metastore calls and launch MR jobs to execute the actions.
Since the changes on the source cluster are serialized in the log, it would be straightforward to execute the changes in the same order on the destination cluster. However, in practice, this would be too slow as copying a single table or partition can take seconds to minutes. Instead, the changes must be replicated in parallel with multiple threads. To facilitate concurrency, the actions are arranged into a DAG depending on the concurrency restrictions. Usually, these restrictions result from multiple actions on a single table. For instance, table creation should be done before copying partitions. By executing actions in parallel, the lag in replication is kept to a minimum.
Using incremental replication, it’s possible to replicate changes quickly and reliably. Replicating data can be used to keep two warehouses in sync for disaster recovery — if one cluster is down, the other can be used to perform essential functions. Compared to batch replication, incremental replication is more efficient where the data warehouse is large, but the amount of data that changes is relatively small. Currently, at Airbnb, less than 2TB of data changes on the production cluster on a daily basis. Since this is a small part of the warehouse, incremental replication makes sense.
By using both batch and incremental replication, we were able to quickly migrate to a two-cluster setup that realized our goals for higher isolation and reliability. We hope that these tools will be useful to the community as well!
Link to GitHub: https://github.com/airbnb/reair