How we made our reporting engine 17x faster
Data relocation, the key to our new reporting solution
In this article, we will see how we increased our reporting performance by moving most of our business logic into our data warehouse and removing all the post-processing of our data.
How reporting works at Teads
At Teads, we provide marketers with an advertising platform to deliver their campaigns. To provide customers with insights about how their campaigns are performing, we need to join several types of data:
- Fact events, which are data related to the users’ interactions with the ads, sent by the browser. These are standard events defined by the IAB (impression, complete, pause, resume, etc.). This data is about around 35B rows by day. The reporting engine uses a compressed version of this data that is still around 3.8B rows.
- Dimension tables, which contain information about the ads and the context where they run. This data is used to link the events to data that isn’t forwarded by the event. This could be information about the advertising campaign such as its name, budget, etc. These are much smaller tables: the biggest dimension table only contains 1M rows.
Our historical system
Our previous reporting engine was split into two parts:
- A reporting API, responsible for receiving the request of the user, parsing it, and triggering the computation
- A reporting engine, that load and computes the necessary data to build the requested report. This engine was based on EMR and runs Spark 2.4.
Zooming in on the Reporting Engine
If you want more information on our ingestion pipeline you can read our initial article about it.
To generate the reports, the report engine needs to load compressed events data from BigQuery thanks to the BigQuery Storage API. Once fact events are in memory, the reporting engine starts to list dimension ids from the events.
As the dimensions data is in another database we cannot join on it directly, due to this limitation the reporting engine builds a cache with the needed data selected from the list of ids and then join on it.
After the join, it starts dropping the raw data to keep only the requested data and union them to have as few rows as possible.
It will then export it in the requested format to S3.
Steps of the data for a report on advertiser_name, creative_type, impression
Compressed events data
|1 |2 |4 |
|2 |3 |10 |
|3 |3 |16 |
|4 |2 |30 |
Compressed events data with joined dimensions
|1 |advertiser_b |1 |video |4 |
|2 |advertiser_a |2 |image |10 |
|3 |advertiser_a |3 |image |16 |
|4 |advertiser_b |4 |video |30 |
Union data ready for the report output
|advertiser_b |video |34 |
|advertiser_a |image |26 |
Limitations of this solution
This solution helped us bootstrap reports for everyone inside and outside the company, and provide reports that link our events and dimensions data.
However, we ended up with three major limitations:
- Network impact, due to the size of our events data and the granularity of the ids needed to join our dimensions data, the reporting engine moved a lot of data to build the report
- Memory Consumption, this data needed to be loaded in memory to be computed by Spark. This led to the usage of a lot of memory to compute simple reports.
- Processing Latency, the previous limitation impacts the time to process reports, a yearly report could take hours to process due to all the data moved from each source.
Bootstrapping the new service
For this new service, we wanted to simplify reporting as much as possible by reducing the number of steps between the request from the users and the result.
As explained in the past section, most of our report columns were previously computed directly in the reporting engine.
In order to remove a part of our network impact we scheduled a clone of our dimensions data in BigQuery to avoid moving dimensions data on run time.
This projection introduces a new storage cost for our data warehouse, but it is absorbed by the removal of the infrastructure need to join this data.
A second part of the simplification take place in the Ingestion Pipeline where we also provide some of the dimensions data to avoid complexe join during the query done by the Reporting Engine. To avoid relocating the Network Impact on the ingestion pipeline we choose to only hydrate events with low cardinality dimension such as enum.
For example, for the
creative_type dimension we can now group directly in BigQuery without joining, instead of pulling all creatives and checking their type from the dimensions data.
Events before hydration
|impression|1 |1 |....
|click |1 |1 |....
|impression|2 |2 |....
|click |2 |2 |....
|impression|3 |3 |....
|progress-1|3 |3 |....
|progress-2|3 |3 |....
|impression|4 |4 |....
|progress-1|4 |4 |....
|progress-2|4 |4 |....
|progress-3|4 |4 |....
Events after hydration
|impression|1 |image |1 |....
|click |1 |image |1 |....
|impression|2 |image |2 |....
|click |2 |image |2 |....
|impression|3 |video |3 |....
|progress-1|3 |video |3 |....
|progress-2|3 |video |3 |....
|impression|4 |video |4 |....
|progress-1|4 |video |4 |....
|progress-2|4 |video |4 |....
|progress-3|4 |video |4 |....
To tackle our network impact limitation we had to limit the data we have to move from the data warehouse to our reporting engine instance.
To do that, we moved the computation from our reporting engine to our data warehouse by converting our report requests into SQL queries.
The query generator is composed of two parts:
- A static definition of the metrics and dimensions graph, and how they are defined in the data warehouse tables.
- A dynamic SQL query generator that uses this graph to generate a query specific for each report
Stream the data to the client
One of the biggest limitations we had with Spark was the need to load all the data in memory. This led to scaling issue: more data meant we needed more memory in Spark instances.
Now that the join and reduce part are centralized in the query provided to BigQuery, the data loaded by the reporting engine is smaller than before.
To be sure that the size of the report will never be a limitation, we decided to stream every data that come from BigQuery directly in S3.
Thanks to the BigQuery Storage Read API we are able to output the result of our query in a temporary table on BigQuery and easily read the result in a stream.
Between the read and write operations, a serialization is applied from the Avro format provided by BigQuery and the output selected by the user.
By focusing on simplicity and removing complexity we were able to improve our ability to add new dimensions/metrics and improve the engine performance.
On average, the report generation time is reduced by a factor of 17 with the new reporting engine. This improvement is mostly due to the reduction of data transfer by applying better filtering and computing data in only one place.
Cost by report
By reducing the amount of data queried during a report we are able to absorb the increase in query complexity while not increasing BigQuery costs. But we were able to reduce the cost of the reporting engine by a factor of 10 by removing the EMR cluster and only focusing on a small service that forwards query and stream results.
Compared to our legacy reporting engine, gains in cost and speed are significant but there is still room for improvement.
To continue on the path of simplicity we want to abstract the compartmentation of the data directly in BigQuery, to make sure the reporting engine only has access to the relevant data it needs in its execution context.
See this follow up article:
Sandboxing our client reports
How we use BigQuery’s table-valued functions to prevent data leakage
Our events are currently stored in generic aggregated tables. To further reduce the amount of queried data we could create smaller data sources per domain (data marts).