An Ad Impression Forecasting Tool Built with Apache Spark

Iris Fu
GumGum Tech Blog
Published in
15 min readAug 12, 2019

Written by Michael Williams on June 15, 2017

Introduction

Much can be said about the cutting edge technologies developed and used by GumGum, but a core component of GumGum’s business involves the effective display of digital advertisements. We work with agencies and brands to create engaging and relevant ads that are shown on the websites of our publisher partners. To accomplish this, we rely on being able to carefully target ads to specific user groups in specific browsing situations.

A typical behind-the-scenes conversation between GumGum and one of its advertising partners might proceed as follows.

Advertiser: “We want 100,000 BMW ads displayed to males, age 25–40, living on the West Coast, making $150k per year, using a mobile device. Can you do it?”

GumGum: “Hmm, let me check…”

So, a Serious Business Question is, “How many spaces for ads will be available in the near future, subject to campaign-specific restrictions?” Hence, the need for Instavails. What does the name mean? Some simple math:

Instavails = Instant Avails

Avails = Available Inventory

Inventory = space on publisher pages for ads

To elaborate, a unit of inventory is a piece of “real estate” on a publisher’s page where an ad can potentially be placed. GumGum has various types of ad products (e.g., In-Image, In-Screen, and Native, all described here), so a unit of inventory could be an image (for In-Image), a small section of a page (for Native), or the whole page itself (for In-Screen). Of course, the publisher must support these different product types for it to count as inventory.

The goal of Instavails is to give GumGum a clear picture of how much inventory will be available in the near future, subject to various restrictions like those mentioned above. It finds the number of matching impressions from the last 365 days, and predicts future numbers based on those results. The end result looks something like this.

Instavails results

The goal of this post is to describe just how we arrive at this picture.

Instavails: Project Overview

Essentially all of Instavails lives in the AWS cloud

0. Ad impression logs are stored in S3. (This happens continuously, unrelated to this project).

1. Log data is sampled via a data pipeline and stored in S3. (This happens each night for the previous day’s logs.)

2. Past year’s samples are loaded into a Spark SQL database on an EMR cluster. (The data is updated each night.)

3. Queries are sent to the cluster, they executed and forecasts are made, then everything is returned to the server.

Counting Inventory

Before discussing these components, we should mention more about counting inventory. To predict available inventory in the future, we must have a way to count inventory from the past. This would be easy if we had a master list of all of our publishers’ web pages, but the web is dynamic: publishers add/delete pages constantly, and it would be unreasonable to expect each publisher to update us with all such changes. Instead, we consider inventory impressions. Each time a user visits a page, we consider that an “event” and we log it (usually in JSON format) with information about the publisher, page, user, etc. Thus, we can count inventory by searching log data. This data is generated at a rate of 100s of GB/day.

Here are a few words about Apache Spark for those who are unfamiliar. It is an open source cluster computing framework that provides an API for parallel, fault-tolerant, in-memory (i.e., fast) computation. The main data abstraction in Spark (1.6) is the DataFrame, which can be thought of as distributed data with a schema (much like a distributed SQL table). Spark supports a number of programming languages (we primarily use Scala) and data sources.

(image source: http://blog.xebia.fr/wp-content/uploads/2015/06/spark-future.png)

On our AWS cluster, Spark jobs are controlled by the “driver”, which runs on the master node. Each worker node has one or more “executors”, which process tasks related to the overall job.

(image source: http://blog.cloudera.com/wp-content/uploads/2014/05/spark-yarn-f1.png)

Each day, an AWS Data Pipeline runs to sample the inventory impression logs. Why is this step necessary?

Simply put, we have an enormous amount of log data (petabytes per year), and querying all of it would take too long. Sampling means we keep a small but representative portion of the data and query that instead. We just need a way to relate those results to the original dataset; this is called the estimator

The most basic type of sampling scheme is called uniform sampling. It selects elements from the base dataset with equal probability, that is, from a uniform probability distribution.

(image source: http://dws.informatik.uni- mannheim.de/fileadmin/lehrstuehle/pi1/people/rgemulla/publications/gemulla08thesis.pdf)

Uniform sampling is easy to implement and it is very fast, but it has many downsides. For example, we want to consider some impressions to be “the same” even though they might contain different data: impressions from the same user on a given web page over the course of a day should be treated as copies of the same impression. In this way, one user who happens to view our publishers’ pages too often will be over-represented in a uniform sample. This is troublesome, because one factor in how GumGum serves ads is a frequency cap: we don’t show ads to the same user to frequently (e.g., more than once an hour). Because a uniform sample will be biased towards commonly-occurring items, it is the best choice for this project.

Distinct-Item Sampling

As an alternative, we use a sampling scheme that samples uniformly from the distinct items in our set of inventory impression logs. The name of the algorithm is AMIND(M): Augmented min-hash distinct-item sampling, with at most M distinct items. This sampling scheme is described in Sampling Algorithms for Evolving Datasets, Rainer Gemulla.

As the name indicates, a major ingredient in AMIND(M) is a hash function. Let H be a prime number and choose A, B ∈ {0, …, H — 1} randomly. The parameter H is called the “hashmod”, since we now define h(x) = Ax + B (mod H), where x is the integer representation of an item in the dataset. (We don’t explain exactly how x is obtained here.) One can prove that values of h “look uniformly random” in the set {0, …, H — 1}, which is crucial for creating a representative sample. Here is the (deceptively simple) algorithm for creating an AMIND(M) sample.

(image source: http://dws.informatik.uni- mannheim.de/fileadmin/lehrstuehle/pi1/people/rgemulla/publications/gemulla08thesis.pdf)

With an appropriate data structure for the sample, the complexity of INSERT is O(log M), and the complexity of MAXHASH is O(1). For example, we use a sorted map, where the keys are the hash values and the values are lists of corresponding impressions. Note that sample creation requires scanning the entire dataset to try to insert each item. Also, the algorithm is a priori not parallelizable, since we have to keep a sorted collection of hashes.

This may seem like a major drawback, given that the base dataset could be hundreds of gigabytes in size. However, a useful fact about AMIND(M) samples is that if M and h are fixed, the AMIND(M) samples of a dataset form a monoid under a special union operation. Namely, we can combine separate AMIND(M) samples. A natural application of this is to sample each hour’s log data in parallel, and then combine the results.

Here’s the algorithm for combining different AMIND(M) samples.

1. Create samples for all hours in parallel (with same M, h).

2. Combine the samples and group by hash values.

3. Sort by hash value, and take all items with M smallest hash values.

As mentioned, the sampling occurs nightly as part of a data pipeline. The algorithm is implemented in Scala as a Spark job. One basic question is how to process all hours simultaneously? In other words, what is a way to have Spark operate on separate DataFrames simultaneously? The naive approach (i.e., pointing Spark to a list of data sources) does not work.

Instead we do the following

Here, foldLeft allows evaluating a given 2-argument function (in this case, union) against consecutive elements of a collection, where the result of that function is passed as the first argument in the next invocation, and the second argument is the current item in the collection. The only limitation with this approach is that each hour’s data must fit in one cluster’s RAM.

How to relate the sample to the original dataset? In other words, if we execute a query on the sample, how does that result compare to the same query executed against the original dataset? It turns out that it is enough to find Mth smallest hash value. Recall that R is the dataset, S is the sample, and the hash function is h(x) = ax + b (mod H), where H prime.

There is some important impression data that is not stored in the logs. For example, we need to enhance each sample with BlueKai demographic data from a Cassandra table. One such demographic category is “27729: Interest in Robots, Monsters, & Space Toys”.

BlueKai data is associated with users who view publisher pages, and we use the DataStax connector to access Cassandra from Spark. The data in Cassandra is stored as tuples (visitor ID, bluekai category ID). The idea is to get all distinct visitors from our sample, do a “group concat” to get all corresponding category IDs per visitor ID as a string, then join that with sample. Spark makes this simple.

Here, foldLeft allows evaluating a given 2-argument function (in this case, union) against consecutive elements of a collection, where the result of that function is passed as the first argument in the next invocation, and the second argument is the current item in the collection. The only limitation with this approach is that each hour’s data must fit in one cluster’s RAM.

How to relate the sample to the original dataset? In other words, if we execute a query on the sample, how does that result compare to the same query executed against the original dataset? It turns out that it is enough to find Mth smallest hash value. Recall that R is the dataset, S is the sample, and the hash function is h(x) = ax + b (mod H), where H prime.

There is some important impression data that is not stored in the logs. For example, we need to enhance each sample with BlueKai demographic data from a Cassandra table. One such demographic category is “27729: Interest in Robots, Monsters, & Space Toys”.

BlueKai data is associated with users who view publisher pages, and we use the DataStax connector to access Cassandra from Spark. The data in Cassandra is stored as tuples (visitor ID, bluekai category ID). The idea is to get all distinct visitors from our sample, do a “group concat” to get all corresponding category IDs per visitor ID as a string, then join that with sample. Spark makes this simple.

For those unfamiliar, the AWS Data Pipeline framework provides a way to manage complex data processing workloads in a scalable, fault tolerant, and repeatable way. It is easy to handle scheduling, failure notification, retries, etc. Here’s the directed acyclic graph corresponding to our sampling pipeline.

This pipeline runs nightly, after previous day’s logs have been saved to S3. It takes about 1 hour to create samples (which is OK, since AWS charges by the hour!)

Spark SQL Database

Our samples have been created, how to we set up the database?

The main way that users interact with our database is through Spark Job Server, which is a RESTful interface for submitting and managing Spark jobs, contexts, and jars. When running on our cluster, it provides a web UI to see all jobs, contexts, and jars.

The main benefit of Spark Job Server is that we can start one Spark SQL context, create our DataFrames (described below), and the data will be available indefinitely for queries. To use it, simply extend the SparkSqlJob trait to persist the Spark SQL Context.

The RunJob method is where the actual interesting stuff happens, such as executing queries. The Validate method just checks the input.

To create the Spark SQL database, we read each daily sample from the last year and “enhance” (i.e., join) with extra data.

The extra data comes from MySQL via Spark’s built-in JDBC support. This data includes publisher data that changes very frequently (so it needs to be fresh), and it is not present in the impression logs.

The final dataframe is the union of the daily samples.

We partition the final DataFrame by IP address so that it is (mostly) evenly distributed across the cluster. Giving executors even amounts of data reduces “stragglers”. We arrived at 400 by trial and error.

Our Spark SQL database is set up, how to we query it?

Users can put restrictions into a web interface, and these are translated into simple SQL-style queries. Roughly, restrictions map to SQL predicates. Here’s an example of a query that is sent to Spark Job Server.

These numbers are converted into a graph on the frontend. How do we obtain this map of results from the query?

Parsing

The initial query doesn’t do everything we need, so we have to process it to account for date and frequency cap. For the frequency cap, we literally cap the count of IPs.

The result is close to a date → count map, but there is a bit more.

Spark SQL allows for SQL style queries, so most of the hard work (e.g., group by) is already done. Once the query is parsed, we just need to extract the two relevant fields and apply the estimator.

The result now is a date → count map, which we turn into a JSON string. Spark Job Server returns another JSON as output.

Complications

Our tables have several multi-label columns, such as “keywords”. Keywords describe the content of the page where the impression happened. A given page can have about 10 keywords, but a query can have potentially 1000s!

Implementation questions include, “How to store such a column store in our database?”, and “How to query such a column?”

Our first attempt was to store keywords as comma-separated strings and query with the LIKE operator. So, an entry might look like this:

“angels,clippers,dodgers,ducks,galaxy,kings,lakers,rams,sparks”

A query predicate might look like this:

keywords LIKE “%angels%” OR keywords LIKE “%baseball%”

There were several major problems with this approach. First, it yields inaccurate results (e.g., “angel” vs. “angels”). Second, and most importantly, is performance. There could be thousands of keywords in a user’s restriction, so thousands of predicates in the resulting query. This means slow execution times. In some example, we saw queries that lasted 30 minutes!

Another idea was to store keywords in arrays. Unlike MySQL, Spark SQL supports array data types (among others). For example,

[“angels”, “clippers”, “dodgers”, “ducks”, …]

ne can use the array_contains method to search with arrays in a DataFrame. However, storing keywords in an array is inefficient, partly because Java objects have somewhat large overhead. Our DataFrames increased in size by several times! It also does not solve the performance issue, because one needs one array_contains predicate per keyword in the query.

To find a better solution, we stepped back and considered the abstract goal of searching for keywords: to determine if two sets A and B have non-empty intersection. There is an obvious solution: for each element of A, look for match in B. This works, but, it is rather costly, with complexity O(|A|*|B|). A key insight was that the contents or size of the intersection don’t matter, only if it is empty or not.

A better solution was to sort A and B in advance! Then, the algorithm is as follows.

1. A better solution was to sort A and B in advance! Then, the algorithm is as follows.

2. Increment the index of the list with the lesser element.

3. Continue until a match is found (non-empty intersection) or the ends of both lists is reached (empty intersection).

The complexity of this new algorithm is O(|A|+|B|), which is a great improvement. For example if |A|=10 and |B|=1000, the difference is a factor of 10.

Note that it is crucial that the sequences are sorted, but this isn’t much of a problem. We can sort the database entries in advance, and the query data must only be sorted once. Here’s a Scala implementation.

Spark SQL allows for user-defined functions (UDFs), which is how we use the above algorithm to query the database. Suppose we are given the following query.

Of course, “INTERSECTING” isn’t a real Spark SQL operator, it is just a placeholder so that we know how to process the query.

In processing the query, we extract keywords. These are broadcast and passed to the UDF as the second argument when we register it.

The same query that took 30 minutes with the old method now takes less than 30 seconds!

Forecasting

Our query has been executed, how do we make a forecast from the results? The main problem is that the data is noisy, so simple solutions like regression are not viable.

Instead, we use other methods. Search results constitute a time series: a sequence of numbers at evenly-spaced time intervals. This is now a “small data” problem: there are only 365 numbers in the time series!

To understand the time series, we decompose it into three pieces: one smooth and two periodic. First is the general trend, which we obtain by applying a smoothing kernel to the series.

Next is the quarterly trend. The main reason for considering it is that quarterly patterns arise from the nature of the ad industry. Traffic is generally lower at the beginning of the quarter and higher a the end. As such, we treat it as periodic.

Finally, we have the weekly trend. Traffic tends to be higher during weekdays and lower on weekends. This is treated as periodic.

We use linear regression with sums of sines and cosines to model the periodic trends:

Here is the quarterly trend with its model.

Here is the weekly trend with its model.

Now, to actually make a prediction about the future impression behavior, we combine the predictions from each of these three pieces. We predict the future behavior of the general trend with an ARIMA (Auto-Regressive Integrated Moving Average) model, and the periodic models are simply extended into the future.

Forecast = ARIMA prediction + Weekly Sinusoid Model + Quarterly Sinusoid Model

This part of the project is actually implemented in R. We use the RCaller library for Java, which allows one to wrap R code inside Java/Scala. We also use the tseries and forecast libraries for R.

We care more about total number of impressions available in the forecast period than on a given day, so a natural error metric is the following,

where f = forecast data, a = actual data.

Most queries we’ve tested are within 5–10% of the actual totals, but quality decreases as forecast range grows.

Final Remarks

We mention a few other aspects of the project.

To check on usage, query times, search patterns, etc., we use Sumo Logic to monitor the log data generated by the system.

Future Improvements

The biggest planned improvement is to upgrade to Spark 2.1. We will do this once Spark Job Server supports that version of Spark. This promises to greatly improve performance in several areas.

Note: all uncredited images were created by the author.

We’re always looking for new talent! View jobs.

Follow us: Facebook | Twitter | | Linkedin | Instagram

--

--