The Impact of Real-Time Data Aggregation Platform on Business Performance

Yulia Stolin
Outbrain Engineering
7 min readDec 17, 2020

Introduction

First of all, I would like to start this blog with a couple of words about the Outbrain domain.

We are a discovery platform:
Helping people discover things that are interesting and relevant to them.
Helping digital advertisers connect with the right people and get discovered.
We run on the world’s best websites, apps and news feeds, and serve millions of requests per minute.

In such a system, it is highly important to have real-time data insights. For example, we want to know what documents are trending, to discover hot news, perform budget capping, along with many other features.

In this post, I will describe how we designed and implemented our real-time aggregation platform and its adoption in different use-cases.

Examples

To describe and understand the general system let’s start with some examples

Aggregations as Features for Models, Algorithms

In our CTR (Click Through Rate) prediction flow, we serve ads from different advertisers and learn how well they perform. These performance calculations performed over different slices of the data: We can calculate the ad performance for various dimensions, i.e the number of impressions and the number of clicks. We can make decisions based on different inputs, or provide those inputs as features to the model.

Let’s take a look at the following aggregations:

  • ad, publisher, impressions_num, clicks_num
  • ad, country, impressions_num, clicks_num
  • ad, category, platform, impressions_num, clicks_num

In the above example, we need to calculate three different aggregations and make some decisions based on the values.

Capping

We use capping in many of our internal systems. Let’s take our exploration flow as an example. In this flow, we want to count how many times a new ad was presented. This knowledge helps us to decide whether to promote the ad to exploitation or to change the exploration strategy.

We can count the budget of the campaign or other important system KPIs.

Analytics Dashboard

Imagine that you have an analytics dashboard for campaign performance. Based on that dashboard, the campaign managers could change different settings in their campaigns, and see the impact of those changes.

Let’s use the following aggregation for our use-case:

There are many additional use-cases that require the usage of different pre-aggregated data. Examples of such data are UX optimization, counters, performance measurements, etc…

Previous Architecture

This chart describes our previous architecture.

Clearly, the above architecture belongs to the batch flow-based system, also called ETL:

  • Serving online services write logs to Kafka topics.
  • There is an ingestion process from those Kafka topics to HDFS.
  • There are Hive external tables that represent each Kafka topic
  • Hive tables use hourly partitions (a new partition created every hour). From that point, there are flows with hourly/daily/weekly/… period triggered by FullHourPass Trigger. Likewise, those flows trigger other flows, and so on…

Perfect! So what is wrong here? Actually, nothing is wrong, but it’s clear that such a system has a built-in delay. Some data might be available after an hour, some after multiple hours. Flows can have many stages, and as a result, the data might arrive many hours after the initial FullHourPass trigger.

Taking into consideration our domain, such a delay is bad for the system.

What about the trending documents? We need to know this kind of information ASAP. The same goes for capping or UX optimization flows. It is clear that fresh information has a major impact on our system. As a teaser, when we introduced the real-time aspects into our flows, we increased our main KPIs, such as CTR and RPM significantly.

CTR — click-through rate (probability for click)

RPM — revenue per 1000 impressions

It is important to note that some of our critical flows used a custom implementation to gain real-time data insights. An example of such an implementation could be a java service that consumes Kafka events, updates some counters, and writes the data to the data store. Although, implementing this kind of hand-rolled services is not trivial. Handling millions of events requires implementing some caching strategy, managing Kafka offsets, and many other software aspects. So every new use-case must implement another such service.

This is obviously a waste, if we have an implemented framework that can do the job for us, why not reuse it? No one wants to reinvent the wheel.

New Architecture

We quickly identified that there are use-cases that require real-time data addition into existing flows. Moreover, there are flows that depend on either real-time or batch data only. For all those reasons “Lambda Architecture” was the right choice for us.

Such a system clearly benefits from having components of both real-time and batching in a unified place. Also, since some batch flows were already defined and in massive use in the system, we wanted to have a flexible way to enhance that data with real-time insights, without changing the whole flow.

Let me quote the definition of Lambda Architecture:

“Lambda Architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch and stream processing methods”

From Wikipedia

The following chart describes the current architecture with the addition of real-time components.

This architecture adding a couple of new components:

  • It is now possible to connect directly to Kafka and consume the same data, similar to the hive batch table.
  • There is an additional storage layer for the streaming data output.
  • There is a Data Unification Layer for services that need to consume combined stream and batch data.

We chose Spark as our aggregation framework. There are many streaming frameworks, so why use Spark?

First of all, in some of our flows, we are already using Spark batch aggregations. Thus, it is possible to reuse some parts of the code, have common libraries, and some testing concepts. Secondly, the Spark framework itself has very good integration with many sources. Likewise, Spark has a great and flexible API and already-implemented connections to many external sources: HDFS, Hive, DB (i.e MySql,…), NoSQL DB (Cassandra, Elastic, …), and of course Kafka.

RTAP (Real-Time Aggregation Platform)

Based on all requirements we have created a software framework called RTAP ( Real-Time Aggregation Platform )

This framework enabled us to benefit from both the real-time and the batch data layer in our system.

Depending on the use-case, it is possible to define:

  • General aggregation
  • Input and Output for the data
  • Dynamic State for filters, metadata, …
    (Mainly used for streaming aggregations)
  • Custom features with UDFS
  • And many many more

Additionally, as discussed in the use-case examples, the framework facilitates performing different aggregations in a single pass. Such an approach optimizes resource usage.

For instance, let’s return to a previous example: calculating 3 different aggregations

It is possible to read from two Kafka topics, impressions and clicks, and in a single pass by using flatMap (Spark function) create 3 different aggregations, then write this data to the defined datastore.

Depending on the use-case, it is possible to consume only batch, real-time, or combined data. In order to support combined data, we created a data unification service that reads data from both offline and online data sources and creates unified data in response.

This allows all clients that used such data to migrate and continue using a similar API, only having much updated and fresher the underlying data.

This schema describes the basic aggregation:

We can see that having a generic implementation for such aggregation definitions makes it very easy to run both on Hive tables and on Kafka topics in the same way.

Additionally, one of the biggest advantages of this system is it’s easy, and almost self serve definition. It is possible to set up a new aggregation and to start using its real-time data in a very short period of time. All that a developer needs to do is to define the aggregation schema, run a build process, and create a job definition with the schema parameter. From that point, data will flow into the defined data-source, while all the job management, Kafka offsets, job lifecycle, restarts, monitoring, and alerting are automatically managed by the system.

Summary

We implemented Real-Time aggregation and the Data Unification Layer, and the first use-case was in the existing CTR predictions flow. Nothing other than, the data freshness was changed, but the effect was very significant. We saw an immediate increase in our CTR and RPM metrics. So just having the fresh data brought the company both revenue and user engagement.

After that, we added the Real-Time data to our advertiser performance dashboard. We got immediate feedback from our campaign managers, who were able to perform a much deeper and quicker analysis of their data; and to optimize campaigns accordingly.

Our Lesson Learned from this journey — look at the data you have, use, or might have. Ask yourself whether fresher data might bring benefit in this or that scenario. The most probable answer will be, “Yes! Go For It! The effort is worth it!”

I would like to finish by quoting Marissa Mayer, Former President, and CEO at Yahoo

“With data collection, ‘the sooner the better’ is always the best answer.”

To be continued …

In the next part, I’ll share the Spark implementation details and ideas that we’ve used to implement this system. I’ll go over the challenges we had in integrating the offline and online layers, the interesting design decisions that were made during the implementation, and much more.

--

--

Yulia Stolin
Outbrain Engineering

15 years of hands-on experience in software architecture, building high volume, scalable, high-performance systems