Pinterest’s Analytics as a Platform on Druid (Part 2 of 3)

Pinterest Engineering
Pinterest Engineering Blog
8 min readAug 26, 2021

Jian Wang, Jiaqi Gu, Yi Yang, Isabel Tallam, Lakshmi Narayana Namala, Kapil Bajaj | Real Time Analytics Team

This series is three parts. To read part 1, click here. Part 3, click here.

In this blog post series, we’ll discuss Pinterest’s Analytics as a Platform on Druid and share some learnings on using Druid. This is the second of the blog post series, and will discuss learnings on optimizing Druid for batch use cases.

Learnings on Optimizing Druid for Batch Use Cases

System Visibility

During the process of onboarding different use cases, we found many critical system metrics were missing in Druid. In response, we added metrics on usage of processing threads, merge buffers, rows in memory, etc., to help us get a clearer idea of the bottlenecks affecting capacity provisioning and to identify opportunities to make improvements.

Tiering Based on Request Pattern

Druid supports loading segments into different server pools based on segment timestamps, and higher end hosts can be used to serve more frequently accessed segments. Initially, we didn’t have quantitative analysis on the request time range distributions other than heuristics provided by clients, so the tiering was somewhat arbitrary. Later we logged all Druid requests to a data source to analyze, and it has helped us make smarter decisions on tiering. Below is an example of one of the use cases that we used to load all six months’ segments into memory optimized hosts. After query pattern analysis, we found 98% of the requests hit the most recent 35 days, so we shifted segments older than 35 days ago to io optimized-based hosts without impacting SLA while saving infrastructure cost.

The diagram shows the request time range distribution for use case 1 where over 98% of the requests hit segments in the past 35 days
Table 1: Use case 1 request time range distribution

Secondary Key Pruning

By default, Druid uses the timestamp as a primary key to partition segments. Data can be further partitioned by chosen dimensions as a secondary key if you use hash-based or single dimension partitioning during ingestion. During query time, Druid uses a two layer query architecture where brokers figure out which segments are to be scanned and fan out requests to the data nodes hosting them. When shard specs with secondary keys are used, brokers can prune segments based not only on timestamps (primary key) but also partition dimensions (secondary key). This increases the chance of skipping sending requests to data nodes to scan the segments that are guaranteed to return empty results and save precious processing threads on data nodes. We used hash-based partitioning, as it’s simple. But we later found it didn’t utilize the secondary key during query time as the single dimension partitioning would. After we added the missing logic to use the secondary key, the number of segments to scan dropped 3x, which greatly relieved the burden of data nodes.

The diagram shows the number of segments to scan before and after applying secondary key pruning for use case 2 (3x reduction)
Figure 1: Use case 2 number of segments to scan

Optimizing Partitioning for Skewed Data

Hash-based partitioning works well for the general batch use cases, but it showed drawbacks when we onboarded the business reporting use cases whose typical query contains a given partner_id. We hashed data by partner_id to reduce segments to scan during query time. We saw some ids with way larger amounts of data than other ids to the extent that data for top ~20 ids accounted for over 90% of the total data, even if the cardinality of the total ids are millions. These dozens of large partners happen to be the company’s largest customers, so we can’t simply ignore them. In the ingestion job, a single reducer is responsible for creating a single segment, and the presence of large partners leads to long tail ingestion latencies to create a few segments way larger than others. Meanwhile, during query time, even if queries for the majority of the partners work fine, the ones for the dozens of large partners are slow or even time out (five seconds for our use case). Since the basic processing unit in Druid is a segment and it is only processed by a single thread, there is nothing we can do to speed up the process unless we can distribute data of the large partners into multiple segments.

While looking for solutions, we found none of the existing partitioning, hash-based or single dimension-based, can distribute data of a given id into multiple segments based on its number of rows. Therefore we created a custom shard spec with the above goal. We piggybacked on the existing single dimension shard spec, which was most similar to what we need: the ingestion workflow contains a stats computing job that calculates the number of rows per id after roll up and before the actual indexing job. We modified the logic to fill rows to a segment until it reaches the predefined segment size threshold and put remaining rows to the next segment until all the rows for an id are done. A few extra metadata fields, e.g., partitionSize, startCount and endCount, etc., are added in the custom shard spec to help with ingestion assignments and query time broker side pruning.

public SingleDimensionShardSpec( String dimension, String start, String end, int partitionNum ) public SingleDimensionEvenSizeShardSpec( String dimension, String start, String end, int partitionNum, int partitions, int partitionSize, int startCount, int endCount, ObjectMapper jsonMapper )

With this change, we were able to produce segments of even size and limit the number of rows for each id in a segment to a predefined segment size (e.g.,5 million). The ingestion tail latency issue was resolved, and query performance became much better for most of the large partners (except for a few largest ones that still timed out).

After more profiling, we found many large partners fill the entirety of the 5 million rows in a segment. The assumption was that processing 5 million rows with a single thread in the data node is quick, but that didn’t seem true for our use case. For the even size custom shard spec we just discussed, the segment size threshold was the only knob, and further reducing it would create a massive number of segments and unnecessarily impact other ids. We then enhanced the previous custom shard spec to include another knob, which is a threshold to limit the number of rows for an id within a segment independent of the segment size threshold. Because the number of distinct large ids is usually small, we were able to store each of the large ids individually in the shard spec.

With this change, we were able to further reduce the latency for large partners without creating more segments or impacting performance of small partners. Meanwhile, we found many of the queries for our use case are group-by queries, so the main work on the data nodes is to fetch all available groups in a segment and transfer back to the broker to sort and merge. We also found the group-by dimensions in the most expensive group-by queries have a cardinality in the order of 10s of thousands. Therefore, we added a ternary partition key as the group-by dimensions in the most expensive group-by queries to hash the data of the same group to the same segment, when data for a partner id needs to span across multiple segments to ensure perfect ingestion time roll up, increase data locality, reduce network transfer, and reduce the amount of final work on the broker side.

public SingleDimensionEvenSizeV2ShardSpec( String dimension, String start, String end, int partitionNum, int partitions, int partitionSize, // K: large partition dimension value, V: number of rows of the value in the current segment Map<String, Integer> largePartitionDimensionValues, // Ternary partition key Set<String> groupKeyDimensions, ObjectMapper jsonMapper )

With the enhanced even size custom shard in place, we were able to reduce the latency for all the large partners except the largest partner. Digging deeper, we found the largest partner contains ~200 million rows after roll up per day. Even if we are able to reduce the time spent on data nodes with the enhanced even size partitioning, the broker would still be the bottleneck because a single host is not enough to merge the huge number of intermediate results returned from data nodes, even with multi-threads. We also tried reducing the work of brokers for the expensive group-by queries by getting an approximate local top N result on the data nodes first with config forceLimitPushDown enabled, but there were still too many rows to aggregate on the broker side. Eventually, we realized that we won’t be able to make it return within the timeout threshold (five seconds) for the largest partner unless we add another aggregation layer to the current aggregation model in the middle of the existing two layers (broker as a single root and data nodes as infinite number of leaves). In the end, we decided to skip querying the latest date to wait for a dataset with some form of rollup that arrives one or two days later for the largest partner as a temporary work around.

Finally, we also found the stats computing job prior to the actual indexing job is pretty slow when input size is large, and in some cases it takes more than 10 hours to complete due to expensive disk operation between map reduce jobs, making it unusable. In reality, it was awkward to present the logic as a sequence of map reduce jobs; however, it’s quite straightforward in SQL: a count distinct group-by query. We optimized this by generating the stats in the same format as the output of the original map reduce jobs in an external SparkSql job, which finishes within ~15 mins compared with previous ~10 hours.

Future work

We plan to add more visibility to system metrics to help us make full use of the host types, automate request pattern analysis for all use cases to help us reduce inefficiency, consolidate shard specs to a well rounded one to reduce burden to maintain multiple ones, and implement a multi-layer aggregation model to reduce latency for the most expensive queries.

Acknowledgements

We have learned a lot from the discussions in the Druid Guild with Ads Data team and the feedback from the open source community when we started contributing back our work. We would also like to thank all the teams that have worked with us to onboard their use cases to the unified analytics platform: Insights team, Core Product Data, Measurements team, Trust & Safety team, Ads Data team, Signal Platform team, Ads Serving, etc. Every use case is different and the platform has evolved a lot since its inception.

To learn more about engineering at Pinterest, check out the rest of our Engineering Blog, and visit our Pinterest Labs site. To view and apply to open opportunities, visit our Careers page.

--

--