Powering Pinterest ads analytics with Apache Druid

Pinterest Engineering
Pinterest Engineering Blog
9 min readJan 16, 2020

Filip Jaros | Software Engineer, Partner Engineering
Weihong Wang | Engineering Manager, Partner Engineering

The Change

When we launched Promoted Pins in 2014, we chose Apache HBase as our database to store and serve all of our reporting metrics. At the beginning of our ads business, this was an appropriate choice because the number of reporting features needed and overall traffic was low. Additionally, HBase already had a proven record in the industry at this time, and we knew how to successfully operate an HBase cluster.

Five years later, our business has matured. As our ads scale has increased dramatically, so have the complexities of the metrics we report to our partners, which has rendered HBase insufficient for our fine-grained analytical needs. As a result, we surveyed the available options and settled on Druid to be the core component of our next iteration.

Why Druid?

HBase works very well when it comes to accessing random data points, but it’s not built for fast grouping and aggregation. In the past, we’ve solved this by pre-building these data views, but as the features needed for our reporting expanded, it was no longer possible to store so many different cuts. Druid allowed us to bypass all of this complicated data slicing ingestion logic, and also supports:

  • Real-time ingestion via Kafka
  • Automatic versioning of ingested data
  • Data pre-aggregation based on user-set granularity
  • Approximate algorithms for count-distinct questions
  • A SQL interface
  • Easy to understand code and a very supportive community

Data Ingestion

Druid supports two modes of ingestion: Native and Hadoop. Both are launched via a REST call to the Overlord node. In the native ingestion case, a thread is spawned directly on the MiddleManager node to read input data, while in the Hadoop case, Druid launches a MapReduce job to read the input in parallel. In both cases, the ingested data is automatically versioned based on its output datasource (table) and time interval. Druid will automatically start serving the newest version of the data as soon as it is available and keep the older segments in a disabled state, should we ever need to revert to a previous version. Since we have several different data pipelines producing different sets of metrics with the same dimensions into a single datasource, this was a problem for us. How do we keep the data versioned but not have each independent pipeline overwrite the previous one’s output?

Namespacing shard specs proved to be the answer. Druid’s standard approach to versioning segments is by their datasource name, time interval and time written. We expanded on this system by also including a namespace identifier. We then built a separate versioned interval timeline per namespace in a datasource, rather than just one timeline per datasource:

This also meant that we needed to either change the existing ingestion mechanisms to create segments with namespaces or invent a new ingestion mechanism. Since we ingest billions of events per day, native ingestion is too slow for us, and we were not keen on setting up a new Hadoop cluster and changing the Hadoop indexing code to adhere to namespaces.

Instead, we chose to adapt the metamx/druid-spark-batch project to write our own data ingestion using Spark. The original druid-spark-batch project works in a similar fashion to the Hadoop indexer, but instead of launching a Hadoop job, it launches a Spark job. Our project runs inside of a stand-alone job without the need to use any resources of the Druid cluster at all. It works as follows:

  1. Filter out events not belonging to the output interval
  2. Partition data into intervals based on the configured granularity and number of rows per segment file
  3. Use a pool of Druid’s IncrementalIndex classes to persist intermediate index files on disk in parallel
  4. Use a final merge pass to collect all index files into a segment file
  5. Push to deep storage
  6. Construct and write metadata to MySQL

Once the metadata is written, the Druid coordinator will find new segments on its next pull of the metadata table and assign the new segments to be served by historical nodes.

Cluster Setup

In general, the date ranges for querying advertising data fall into three categories:

  1. Most recent time period to display
  2. Year-over-year performance reporting
  3. Random ad-hoc queries of old, historical data.

The number of queries for the most recent day vastly outnumber all other reporting types. With this understanding, we bucketed our Druid cluster into three historical tiers:

  • A “hot” tier serving the most recent data on expensive compute-optimized nodes to handle large QPS.
  • A “cold” tier on mid compute, lots of disk space-optimized nodes. Serves the last year of data sans data in the Hot tier.
  • An “icy” tier on low compute nodes having even more disk space. Serves all other historical data.

Each historical in the hot tier has very low maximum data capacity to guarantee that all segments the node is serving are loaded in memory without needing to page swap. This ensures low latency for most of our user-driven queries. Queries for older data are generally made by automated systems or report exports which allow for higher latency in preference to high operating cost.

While this works very well for the average query patterns, there are cases of unexpected high load which require higher QPS tolerance from the cluster. The obvious solution here would be to scale up the number of historical nodes for these specific cases, but Druid’s data rebalancing algorithm is very slow at scale. It can take many hours or even days for a multi-terabyte cluster to rebalance data evenly once a new set of servers joins the fleet. To build an efficient auto-scaling solution, we could not afford to wait so long.

Since optimizing the rebalancing algorithm would be very risky to deploy on a huge production system, we decided instead to implement a solution for mirroring tiers. This system uses maximum bipartite matching to link each node in the mirror tier to exactly one node in the primary tier. Once the link is established, the mirroring historical doesn’t need to wait to be assigned segments by the rebalancing algorithm. Instead, it will pull the list of segments served by the linked node from the primary tier and download those from deep storage for serving. It doesn’t need to worry about replication since we expect these mirror tiers to be turned on and off very frequently, operating only during periods of heavy traffic. See below for more information:

During testing we were able to achieve significant auto-scaling improvement given a mirroring tier solution. The most significant portion of time taken now from server launch to query serving is limited I/O bandwidth from deep storage.

Time taken to load 31 TB of data. 2 hours for natural rebalancing. 5 minutes for mirroring tier.

Query Construction

Our Druid deployment is external facing, powering queries made interactively from our ads management system as well as programmatically through our external APIs. Often these query patterns will look very different per use case, but in all cases, we needed a service to construct Druid queries quickly and efficiently as well as to reject any invalid queries. Programmatic access to our API means that we receive a fair number of queries which request invalid dates or repetitive queries asking for entities which have no metrics.

Percent of queries returning empty results per API client. Some clients request non-existent metrics up to 90% of the time.

Constructing and asking Druid to execute these queries is possible but accrues overhead which is unaffordable in a low-latency system. To short-circuit queries for non-existent entities, we developed a metadata store listing entities and their metric-containing time intervals. If a query’s requested entities have no metrics for the specified time intervals, we can return immediately and relieve Druid from additional network and CPU workload.

Druid supports two APIs to query data: native and SQL. SQL support is a newer feature backed by Apache Calcite. In the backend, it takes a Druid SQL query, parses it, analyzes it, and turns it into a Druid native query which is then executed. SQL support has numerous advantages — it’s much more user friendly and certainly better at constructing more efficient ad-hoc queries than if the user was to come up with some unfamiliar JSON.

SQL was our first choice when implementing our query constructor and execution service namely due to our familiarity with SQL. It worked, but we quickly identified certain query patterns which Druid could not complete and traced the issue to performance bottlenecks in the SQL parser for queries with thousands of filters or many complicated projections. In the end, we settled on using native queries as our primary access path to Druid, keeping SQL support for internal use cases that are not latency sensitive.

System Tuning

Coming from a key-value world, the individual queries originating from our API layer were tailored to be low in complexity to allow an optimal number of point lookups. This also meant querying each entity individually, resulting in high QPS in the backend. To minimize the disruption to our entire infrastructure, we wanted to keep our changes simple and get as close as possible to simply exchanging HBase for Druid. In practice, that proved to be completely impossible.

Druid holds network connections between servers in a greedy manner, using a set of new connections per query. It also opens object handles per query, which is the primary bottleneck in a high QPS system. To lessen the network load, we ramped up the complexity of each query by batching the number of requested entities. We observed our system to perform at its best with between 1,000 to 2,000 requested entities in IN filter type queries, although every deployment will differ.

QPS after implementing query batching. 15,000 request / second peaks lowered by 10x

On the server side, we found the basic cluster tuning guidance suggested by the Druid documentation very helpful. One non-obvious caveat is being mindful of how many GroupBy queries can be executed at any time given the number of merge buffers configured. GroupBy queries should be avoided whenever possible in preference to Timeseries and TopN queries. These types of queries do not require merge buffers and therefore need fewer resources to execute. In our stack, we have the option to impose rate limiting based on query type to avoid too many GroupBy queries at once given the number of configured merge buffers.

The Future

We’re excited to have finished the long journey to bring Druid into production, but of course our work continues. As Pinterest’s business grows, our work on the core Druid platform for analytics has to evolve alongside it. It might be difficult to seamlessly contribute all our effort into the main Druid repository, but we hope to share our effort with the community. Namely on features such as a Spark writer and reader of Druid segments, mirroring tiers for auto scaling, and developing a new multiplexing IPC protocol instead of HTTP. While ads analytics matures, we are also onboarding other teams’ use cases, helping them discover how best to use Druid at scale for their needs.

Acknowledgments

This project was a joint effort across multiple teams: Ads Data, Ads API, and Storage & Caching. Contributors and advisors include Lucilla Chalmer, Tian-Ying Chang, Julian Jaffe, Eric Nguyen, Jian Wang, Weihong Wang, Caijie Zhang, and Wayne Zhao.

Credit also goes to Imply.io leaders Gian Merlino and Fangjin Yang for introducing us to and helping us bootstrap Druid.

We’re building the world’s first visual discovery engine. More than 320 million people around the world use Pinterest to dream about, plan and prepare for things they want to do in life. Come join us!

--

--