Replicating big datasets in the cloud

Circus Train is an open source tool developed by Hotels.com for migrating and replicating very large Apache Hive datasets between clusters and clouds. The tool has become a key component of Expedia Group data platforms, enabling off-premises migrations, hybrid cloud architectures, and disaster recovery strategies.

“Alice the elephant loading the Wirth’s Circus train”* — photo by Sam Hood (State Library of NSW) [Public domain], via Wikimedia Commons

Circus Train was created when we found that existing tools were not well suited to our use-cases. Consequently you should consider Circus Train if any of the following are pertinent to your requirements, as these are the fundamental issues that Circus Train was designed to overcome:

  • You are unable to deploy system hooks into existing Hive infrastructure and need a tool with a light touch and small footprint.
  • You need to replicate datasets that are continuously in use, so users are able to read data correctly while it is being replicated.
  • You are targeting S3 as a destination and are concerned that your users may experience data inconsistencies when reading from an eventually consistent filestore.
  • You have mutable datasets, where existing data is overwritten, not simply appended to.
  • You have a broad and exotic big data platform with datasets of every flavour.

Today, Circus Train is used by many brands within the Expedia Group, reliably and consistently copying tens of terabytes to and from our AWS resident Hive instances every day. Its feature-set has expanded over the last 2 years as our user base grew and we encountered differently ‘shaped’ datasets located on different platforms.

If you’d like to learn more about our journey, read on!

Our temporary work-around

The Hotels.com data platform began its journey to the Amazon cloud back in 2016. We quickly realised that for a cloud-based data platform to be useful we’d need to copy or move many large datasets from our on premises data processing cluster to our AWS accounts. It also became clear that some datasets would also need to remain on-premises for the foreseeable future, and so would need to be synchronised to the cloud regularly. We assumed that everyone must be facing this problem and that there would be a plethora of mature and reliable tools out there. We spent a few months evaluating many existing solutions but were frustrated to find that none provided the complete set of features we required.

Exasperated, we implemented a simple data replicator to unblock our Data Science team; copying a needed Hive table into AWS. This quickly led to the ongoing replication of additional tables with our tool, and a stream of feature requests from our users. Finally, we accepted that we were in fact building our own solution and the ‘Circus Train’ was officially born.

Flexibility is limited in massively shared clusters

The greatest influence on our data replication approach was the monolithic nature of on-premises data processing clusters. We were a small team in the Hotels.com brand, using a cluster that must service a huge multi-national organisation. Many data replication approaches are event based; hooking into a log of changes made to the warehouse’s datasets. This is an elegant approach but requires deep integration into the core services running on the cluster. This was simply not an option open to us as the cluster admins had to consider the needs of, and risks to, all of their users across the organisation.

Therefore we required a tool with a light touch, one that could be operated without any elevation of user privilege.

A simpler way to perform replications

Quite simply, when invoked, Circus train pulls sets of data from a source Hive cluster and then writes a replica into the destination cluster. These executions are typically managed with a scheduler. To ensure that consumers in the replica cluster always see a consistent view of the data, we must copy the metadata and data in a coordinated fashion. We do not want metadata on the replica to advertise data that may not have arrived yet. If we ignored this point, our data analysts, engineers, and scientists might experience partial reads or consume inconsistent data. Worse yet, they might unknowingly feed this into their data pipelines and downstream datasets, polluting the lake as it were.

The diagram below shows the sequence of events that typically take place when replicating a dataset using Circus Train:

Circus Train: high-level sequence diagram
  1. Read metadata from the source. There are a number of approaches for determining ‘how much’ metadata to read.
  2. Distributed data read from the source cluster while…
  3. …writing data to the replica cluster. We write these as isolated, immutable snapshots — more on this in a minute.
  4. Once the data has replicated successfully, add metadata to the replica to advertise the availability of new data. Note that this metadata is usually transformed in some way, typically to apply a new URI that describes the data’s new location in the replica.
  5. Notify dataset consumers that new data has arrived.

Consistency changes everything, eventually

When we were fully on-premises, we were casual Apache Hive users. Many of our datasets were described in its metastore, but this was by no means mandatory. Our first surprise on moving workloads to AWS, and specifically S3, was that many of our data pipelines were in some way dependent on the atomicity provided by our on-premises HDFS filesystem. These ceased to function reliably with the eventual consistency offered by S3, potentially exposing us to future insidious data quality issues. In AWS we needed an alternative source of consistency, and so mandated that all datasets be managed and accessed via Hive’s metastore, which offers ACID semantics. This may seem like a very restrictive coupling, binding all our data processing to the Hive engine. However, we’re only actually using its broadly supported data catalogue; we’re still free to employ a wide range of supporting frameworks, such as Spark, to perform actual data processing.

But our problems did not end with the metastore. Some of our source datasets were mutable, and while this was already problematic in HDFS, we saw that it would get a lot worse in S3. Therefore we decided to enforce an immutable snapshot approach in our cloud-based data warehouse, so that data is only written, or revised by a new version, and never updated in place. To save space, old versions would be deleted when we are confident they are no longer relevant. The diagram below illustrates how snapshots are managed and accessed using the example of an unpartitioned table:

Snapshot isolation prevents dirty reads
  1. First replication takes place (coloured above in red), table data is written into thesnapshot_1 folder, the metastore now advertises table data at this location.
  2. A second replication takes place (coloured in blue), replacement data is written into thesnapshot_2 folder, the metastore now advertises the table data at this new location.
  3. A large workload attempts to read table; it requests location information from the metastore and is provided with current snapshot path, i.e. snapshot_2.
  4. While the workload is still running another replication takes place! (coloured in purple) Circus Train writes new data into the snapshot_3 folder and updates the metastore. The long running workload continues to read data from snapshot_2, but new workloads will now read from snapshot_3.
  5. Some time passes. We consider that no running workloads are reading from snapshot_1 and so it is now defunct. A housekeeping process deletes it to reduce S3 resource consumption.

Our desire for strong data consistency was a huge influence on our data replication approach. Few existing tools could perform a coordinated replication of both data and metadata, and none would write data at the destination as an immutable snapshot.

Optimised copiers

Previously we mentioned that we were motivated to provide consistent data access on cloud based file stores after noticing that some of our workloads were designed around atomic behaviours that were only present in HDFS. As it happened, these same behaviours were also assumed by distcp, Hadoop's distributed copy mechanism, and the tool that we first used to perform data replication within Circus Train. After experiencing spurious distcp errors, we migrated to Amazon's s3distcp which performed well, but lacked many of the features we liked in distcp. This exercise yielded both a 'copier' abstraction within Circus Train and also encouraged us to implement our own optimised S3 copier based on the distcp project.

We later took advantage of Circus Train’s pluggable copier feature to implement further optimised copiers for use in specific scenarios (for example replicating data from Google’s GCS and BigQuery). The table below describes the current set of copiers that we provide, and the circumstances in which Circus Train will use them:

Copier selection based on source and target file store types

To summarise Circus Train’s different copier mechanisms:

  • Whenever the replica file system is HDFS or Google Cloud Store (GCS), Apache Hadoop’s distcp is used and it exhibits desirable consistency semantics in these cases.
  • Whenever the replica file system is S3, one of our optimised S3 copier implementations is used. These have been designed to avoid eventual consistency edge cases.
  • Specifically, if the source is also S3, then we have a copier implementation that defers the actual copy operation to the S3 service. This is typically used for cross-region disaster recovery applications and has the benefit that no cluster resource is needed to perform the copy (the S3 service takes care of it).
  • Finally, we have some use cases where we need to move data out of Google Big Query (GBQ) and into S3 and Hive, and so we’ve implemented a copier to manage this also.

Selective data replication

The datasets replicated by Circus Train are often very large and can contain a significant amount of historical data. Consequently, it would be inefficient to replicate the complete contents of tables each time as it is likely that only a small fraction of the data has been recently appended or possibly even mutated. Ideally, we’d have some log or event stream that describes table changes that we could consume. However, recall that we have to operate in a constrained environment, and are unable to deploy notification hooks into our source metastores. Therefore we need some pragmatic approaches for selecting which data we copy on each replication, and so Circus Train supports the following options:

  • Partition filter expressions
  • Hive Diff

For tables with a temporal partitioning scheme, if we have some understanding of the processes that are generating the data, we can often make a reasonable pessimistic estimate concerning which partitions we’ll need to replicate. For example, when data is produced by a process that adds only new data on a daily cycle, we can be fairly confident that we must copy only the last day’s partition. Circus Train can be instructed to do this using ‘partition filter expressions’ that are declared using Spring’s expression language (SpEL), and so for our example we can declare an expression like so:

#{#nowUTC().minusDays(1).toString("yyyy-MM-dd")}

However, not all tables have a partition layout that is keyed on a time-based column, or can be easily defined in an expression. For these situations Circus Train provides a ‘diff’ feature; this compares table and file store metadata from both the source and the replica, and identifies new or modified partitions on the source. Although this is slightly more resource intensive, it can very effectively determine the minimum set of data that needs to be replicated.

Other features

We’ve described the design motivations behind Circus Train and its core features. However, as our user base grew we encountered additional ‘shapes’ of data, ‘interesting’ environments, and had to consider how to effectively manage replication processes that are now a critical part of our platform. Consequently, Circus Train was endowed with many additional useful features which I’ll briefly summarise:

  • Plugin Architecture: Where possible we’ve tried to embrace a composable architecture so that others can easily develop and deploy functionality to assist them with their own integrations. Extension points include: copiers, copier-chains, metadata transforms, life-cycle listeners.
  • Notifications: While we typically execute Circus Train on a schedule, we wanted to be able to trigger downstream consumer jobs as soon as new data arrived in replica clusters. To enable this, Circus Train is able to fire notifications on job completion (SNS currently).
  • Pluggable Metrics: To facilitate monitoring and provide runtime insights, Circus Train publishes many metrics concerning replication jobs. It uses the Codahale Metrics framework, allowing different metrics-gathering systems to be targeted.
  • Copier Chains: Multiple copiers can be chained together and executed in sequence. This has been used in conjunction with DataSqueeze to seamlessly compact small files during a replication.
  • Metadata Transforms: Users can provide transforms to modify metadata in flight. This has been used to upgrade table SerDes, where the version on the source cluster cannot be usefully deployed in the cloud.
  • Copying modes: We’ve had instances where we need to replicate only a table’s metadata. These include situations where both the source and the replica metastore can access the same file store, or in conjunction with the transforms feature, to perform large scale table modifications.
  • View Replication: Some of our users regularly update views as part of their data processing pipelines. Circus Train can be used to replicate these.
  • SSH Tunnelling: It is not unusual to encounter ‘exotic’ networking configurations when migrating from on-premises to the cloud, or when building hybrid or multi-cloud architectures. Often a SSH tunnel can serve as a convenient bridge for metastore connectivity. Circus Train supports the tunneling of metastore connections and bastion hosts.
  • Key stores: External interaction with cloud file stores can often require the use of secrets. Circus Train allows these to be securely managed using JCEKS key stores.

Summary

Migrating a data platform to the cloud will at some point involve migrating the actual data. We took the approach that we’d move our data users and datasets first, so that they’d be able to sooner realise the benefits of cloud computing services. The process of dataset migration is rarely an instantaneous step, but is continuous, long lived, and has to coexist with the ‘business as usual’. When implementing a solution, we tried to adopt the ‘lazy programmer’ approach of using existing tools. However, our team eventually had to develop their own ‘work around’. This unexpectedly, and very quickly became widely deployed across our technically diverse data platforms. Circus Train is the result: a highly flexible dataset replication tool built with data consistency in mind, that requires little integration or reconfiguration of existing infrastructure.

If you think that Circus Train may be of use to you, please take a look at the project on GitHub and consider joining our user group.

Codicil

  • Circus Train is one component of Expedia Group’s cloud platform data sharing strategy. Other open source components of note include: Apiary: a deployable pattern for providing seamless access to disparate datalakes that cross accounts and regions. WaggleDance: A Hive federation service that enables disparate tables to be concurrently accessed across multiple Hive deployments.
  • The Circus Train BigQuery extensions are provided in a separate project.
  • As of Hive 3.0.0, the metastore service is a top level project that can be deployed independently.
  • There has been extensive and ongoing work in the Apache Hadoop project to improve the behaviour of distcp when working with filestores such as S3.
  • Our project name was inspired by the Apache Hadoop project’s elephant fixation, and the use of trains for the transportation of large cargoes over long distances. We are honoured for it to have been noticed.

* [The author does not condone any form of exploitation, abuse, or harm of elephants.]