Calculating A/B test metrics

The architecture of our metric calculation framework and its evolution

Coupang Engineering
Coupang Engineering Blog
19 min readJul 6, 2018

--

By Beibei Ye and Isabelle Phan

This post is also available in Korean.

In mid-2014, Coupang began building an in-house experiment platform to run A/B testing at scale. One of the platform’s most important components is the metric calculation framework, responsible for providing business users with timely insights to help them make data-driven decisions. Developing the metric calculation framework was a major challenge because of Coupang’s scale: billions of data rows need to be processed in near real-time for hundreds of experiments.

In this post, we will outline the overarching architecture of the metric calculation framework in our experiment platform and how it has evolved since 2014.

Table of contents

- Phase 1: Slow and limited (2014–2016)
-
Phase 2: Speed, scale, and robustness (2016)
-
Phase 3: Moving to the cloud (2017-Present)
-
The next phase

Phase 1: Slow and limited

An incremental calculation process was in the first phase of the Coupang’s metric calculation framework
Figure 1. Incremental calculation process: today’s results = yesterday’s results + today’s data

Initially, metric calculations were running on Hive. To optimize computation times, calculations were incremental. Every day, the calculation only processed the current day’s data (e.g. sales transactions and user activities). These results were aggregated with the previous day’s results to generate the cumulative metrics of the day.

This approach had the following limitations:

  • Cost-inefficient and slow. It took close to three hours to calculate metrics for two dozen A/B tests.
  • Inefficient data correction. The incremental calculation method was dependent on the previous day’s calculation results. If an issue was found in the source data from several days prior or if a bug had been introduced in the code, the full calculation job had to be re-run from the date on which the issue occurred, before any other calculations could proceed.
  • Poor metric re-calculation. There was no way to re-calculate the metrics for a particular test if an issue only affected one test or a subset of tests because the calculations for all concurrent A/B tests were tied together.
  • Limited data source. The calculation schedule was limited by the data synchronization frequencies of upstream data sources. More frequently updated data sources were available, but couldn’t be leveraged by our calculation process. As a result, the metrics of a day’s activities could only be provided at 2 PM the following day.
  • Difficult code maintenance. The framework consisted of a single HiveQL script. Queries could not be reused, so the script grew to thousands of code lines, none of which where unit-testable.

These limitations greatly hindered our development cycle. We needed fresh metrics several times a day and quick feedback on any negative revenue impact. The metric calculation needed to be revamped.

Phase 2: Speed, scale, and robustness

To address these limitations, we rearchitected the metric calculation framework. We wanted to create a new system that was sustainable for the long-term and scalable to our explosive growth.

Requirements

Before development, we defined the following requirements in the design process of the new metric calculation framework.

SLAs
A priority of the new architecture was to minimize the delay between event occurrence, such as click and sales, and result delivery. To swiftly measure the effect of the test treatment in its early stages, we wanted the framework to calculate a subset of results every 15 minutes and a full set of results every 4 hours.

Scalability
We wanted to support metric calculations for more than 200 simultaneous A/B tests.

Robustness and fault-tolerance
If a hardware problem occurred, we wanted calculations to resume without manual intervention. If an operation error occurred, we wanted recovery to be quick and painless. In addition, we wanted to be able to re-calculate metrics for a subset of tests in case of any test misconfigurations.

Implementation

To meet these requirements, we developed a Spark-based lambda architecture for our metric calculation framework.

Apache Spark
For faster processing and easier code maintenance and testing, Apache Spark was selected as the alternative to HiveQL.

As the number of concurrent A/B tests increased, the amount of data also increased. To optimize Spark, instead of bluntly processing all the input data, the metric calculation framework divided the data by test and processed each test independently. Using this divide and conquer method, the data size was stabilized and brought back to the level of Spark’s data processing sweet spot, in the range of millions instead of billions.

  • Input data: 200 tests x 15 days x 4 million exposure logs / day = 12 billion logs to process
  • Calculated data per process: 1 test x 15 days x 4 million exposure logs / day = 60 million logs
Parallel spark job sequence of the Coupang’s metric calculation framework
Figure 2. Parallel spark job sequence

Furthermore, to maximize Spark performance and utilize Spark resources to their full capacities, test data processing was parallelized using the Akka actor framework, shown above. The master actor fetched test information and created a pool of worker actors. Then, it generated a message for each test, which was assigned to a worker actor in a round-robin fashion. Even if one worker failed, other tests continued processing. Failed tests were reported at the end of the job and separate jobs can be kicked off for the failed tests only.

This parallel processing dramatically reduced the calculation time, and also isolated test level failures; a single test failure does not block all the other tests’ processing.

Lambda Architecture

An overview of the lambda architecture for the Coupang’s metric computational framework
Figure 3. An overview of the lambda architecture

In addition, the lambda architecture was adopted to meet robustness requirements. There are three layers to the lambda architecture.

The batch layer is the source of truth, processing all data accurately and securely. Although fault tolerant, its results are several hours old. The speed layer compensates for the batch layer’s latency. It processes the delta of data between the last batch and the current point in time. It can compromise accuracy for speed to provide results in near real-time, but it uses the incremental calculation method, making it more complex. The serving layer merges the output of the batch and speed layers to deliver results to business users.

Below are some details of how our framework implemented the lambda architecture:

  • Incoming data sources. Customer test exposure data and real-time order data was available in Kafka. Daily sales transaction data and hourly customer activity data was stored in HDFS. A/B test configuration data was stored in MySQL.
  • Batch layer. A master copy of data partitioned by date and test is stored in HDFS (append only). Batch jobs included Spark jobs that ran every 4 hours and Spark reconciliation jobs that were run daily. Incremental calculations were not used in this layer.
  • Speed layer. Spark streaming jobs ingested real-time order data and customer test exposure data from Kafka. A subset of key performance metrics was calculated incrementally every 15 minutes.
  • Serving layer. Metric results were pushed to the MySQL database and displayed in a web-application for business users.

Data Aggregation Levels
In a standard lambda architecture, the batch layer processes all the raw input data other than intermediate datasets. This ensures the best system robustness because in case of an incident (any code bugs or data issues), once the data is fixed, the batch layer reprocesses the data from scratch and outputs accurate results.

However, initial performance tests revealed that Spark 1.5, which we were using at the time, could not efficiently join datasets in the billions range. To meet our SLA and scalability requirements, we resorted to maintaining daily level intermediate datasets.

The figure below illustrates the data aggregation levels used. The input data was pre-aggregated at the user level, making this dataset independent of any test. This data can be reused by any test to calculate daily test level metrics. Cumulative metrics were calculated by aggregating daily test level data over the test period.

Data aggregation levels of the Coupang’s metric calculation framework
Figure 4. Data aggregation levels

Thanks to this re-architecture, our metric calculation framework met all the agreed upon SLAs:

  • The batch layer processes full dataset in 4 hours, delivery results for hundreds of A/B tests with only 5 hours of delay, instead of 14 hours.
  • The speed layer delivered a subset of results every 15 minutes.

Phase 3: Moving to the cloud

The re-architecture of our metric calculation framework greatly improved the experiences of our engineers and business users. However, as Coupang migrated to a cloud environment, we saw opportunities to bolster its performance.

In 2017, we completed the transition from an on-premises to a cloud environment. We switched our data storage from Hadoop Distributed File System (HDFS) to Amazon Simple Storage Service (S3). With the migration to cloud, the metric calculation framework in the experiment platform underwent two major changes:

  • Spark 1.5 was upgraded to 2.1, which finally allowed us to use Tungsten. By leveraging Tungsten, we could efficiently utilize our memory and CPU resources.
  • Data was read and written directly to S3. Not only was I/O speed significantly improved, but reliability was also enhanced as jobs became independent from the Hive Metastore.

Following the migration to the cloud, the metric calculation times were reduced by more than 60%, from 4 hours to 1.5 hours.

The next phase

In this post, we examined the evolution of the experiment platform’s metric calculation framework, which started as a Hive-based framework that only calculated metrics on a daily basis but evolved into a Spark-based batch and real-time dynamic processing engine.

Our experiment platform team is dedicated to innovating and improving the metric calculation framework for our business users. Currently, a next-generation metric engine is underway to support configurable domain specific metrics. By leveraging more advanced configurations, the new engine reduces the amount of code and time needed to define and onboard new metrics. We hope to share some of these improvements in a future blog post.

If you’re interested in working on improving the experimentation platform of a truly data-driven company, check out our open positions.

--

--

Coupang Engineering
Coupang Engineering Blog

We write about how our engineers build Coupang’s e-commerce, food delivery, streaming services and beyond.