Improving the scalability of a Spark pipeline for conversion attribution
Rohil Bhansali | Software Engineer, Business & Creators
Many Pinners use Pinterest with a planning mindset and often have commercial intent. Advertisers use Pinterest to reach those Pinners, and Pinterest offers ads measurement solutions that calculate metrics based on attributing Pinners’ actions on Pinterest to advertisers’ conversion events. In order to deliver these solutions at scale, we implemented a data pipeline using Spark.
Since the number of Pinners and advertisers on our platform continues to rapidly grow, we need to ensure the Spark pipeline can scale well beyond its current capacity. We recognized early on that simply adding more compute resources would not be a tractable scaling solution. In February 2019, we began looking into potential ways to economically scale our pipeline, both in terms of the business logic as well as taking advantage of new Spark features. We identified two potential solutions and conducted in-depth proof-of-concept tests for them. Below, you’ll find a summary of the considered solutions as well as the results of our testing.
The pipeline takes as input data from advertisers (conversions) and activity on Promoted Pins (actions). Since we allow a conversion to be attributed to an action from up to 60 days before, we need to consider up to 60 days of action logs, making it a potentially quite costly data processing problem. Analyzing the performance metrics of the current pipeline, the main bottleneck was the loading and shuffling of the actions in preparation for joining them with the conversions. Therefore, most of the approaches outlined here aim to reduce the cost of this shuffle as much as possible.
Bucket actions each day to avoid repeated shuffle
As mentioned above, since each day’s conversions are joined with the actions from the previous 60 days, any single day of actions is shuffled 60 times over the course of 60 runs of the attribution job. We can improve this significantly if this dataset is shuffled once and saved in a way that doesn’t require a shuffle on any subsequent day. This process is called “bucketing.”
To do this, we take advantage of Spark’s recently implemented bucketing API. The conversion attribution pipeline essentially forms a join on two large datasets — conversions and actions — the latter being shuffled repeatedly across multiple executions of the join operation. By bucketing the action dataset, we’re able to persist the shuffled state of actions across invocations of the join, thereby eliminating duplicate shuffle work. Bucketing is a specification defined at write time that includes the number of buckets you want (numBuckets) and the keys by which to bucket. At write time, each Spark executor splits the data its holding by the specified keys into numBuckets groups and writes them out to files, thus creating numBuckets files.
The challenges that arose when implementing bucketing in the proof of concept included:
1. Identifying the optimal number of buckets to use while also being cognizant of the storage costs of creating many files on HDFS. When reading bucketed data, the number of buckets limits the parallelism of reading the data. Without bucketing, shuffle parallelism can be tuned over time as the dataset grows. However, because the point is to reduce the amount of shuffling, bucketed files are not intended to be reshuffled and therefore have their parallelism set when they’re created. This creates a tension between read latency/memory pressure on a small number of executors and high file cardinality on HDFS. (We store the bucket files on HDFS with block sizes of 128MB, so too many small files also means that the blocks are under-utilized, which leads to increased storage costs.)
At first, increasing the number of buckets seemed to have a multiplicative effect on the number of files produced. Since each executor was writing numBuckets files, the total files produced was (numBuckets)*(numExecutors). This would mean we would have to reduce the number of writers to keep the number of files in check, though this makes the write time a lot slower. The workaround to this problem was to repartition the data in memory in the same way as the eventual bucketing specification. This ensured each executor would write to mutually exclusive buckets, meaning that the number of files = number of buckets, regardless of the number of writers. This allows both reads and writes to be reasonably fast while keeping the number of files under control.
2. APIs to save bucketed data to an HDFS path rather than a table are not widely available (i.e. .save<filePath> does not support bucketing), and we have to use workarounds like using appendMode in order to write bucketed data to different partitions of the same table. Furthermore, the Spark bucketed data is formatted in a way that is incompatible with Hive, which means we’d lose the ability to query intermediate datasets from Hive or Presto for debugging. However, Spark SQL through the Spark Shell can still be used for debugging purposes. We would like to see extra support for these features added into Spark.
3. Spark is also very brittle about being able to identify when it can take advantage of bucketing. Light processing to a bucketed dataset before joining can cause Spark to use a normal SortMergeJoin, even when the data is still bucketed.
4. Finally, further investigation would be needed to settle on the optimal parameters for numBuckets. As data volume continues to grow, we will keep optimizing this parameter.
Apply delta process — filtering before joining
Another effective way to reduce the cost of shuffling the action dataset across the Spark pipeline is to reduce the size of it as much as possible before joining. Looking at the operations we perform during the pipeline, there was an opportunity to move some of the filtering from a post-join to a pre-join step.
Pinterest’s attribution pipeline currently employs a “last-touch” attribution model, meaning if we have multiple actions that are possibly attributable to a single conversion, we always pick the later one (assuming it still occurs before the conversion). Therefore, for the 59 days of actions that necessarily happen before all the conversions for a single day, we only need to maintain the latest such action. We can apply this filtering each day and update a sliding snapshot of the previous 59 days, which we use in combination with the most recent day of actions as our minimized action dataset.
To update this snapshot on Day N, we can take the [Day (N-60), Day (N-1)] delta processed set and drop all events from Day (N-60), then re-process the remaining set after adding in Day N to get a new 59d delta processed dataset. This dataset will be used when processing for Day N+1.
Note that in order to use this approach with bucketing, we must ensure the snapshot is not reshuffled during this update process. This can be done by broadcasting the new day of actions and doing a left anti-join to make sure the snapshot data never leaves the executors on which it was first loaded. If the broadcast dataset gets too large, we can chunk it and apply the left-anti joins sequentially.
Use ID join to filter out unattributed actions
In conversion attribution, two large datasets are joined together, but the cardinality of the join result is small relative to the inputs. Therefore, another potential way to aggressively reduce the amount of data shuffled is to only join the minimal number of fields in a SortMergeJoin, such that the join result is a list of unique identifiers referring back to records in the full inputs. This list can then be joined back with the full inputs in a second join, but this time using the much-more-efficient BroadcastJoin to achieve the full join result. Then only the actions that are actually attributed have to be shuffled.
This approach results in a smaller maximum shuffle but requires more smaller joins and extra processing steps. The memory-hours and cpu-hours end up not having meaningful runtime improvement, so this approach was not explored further.
Adopt sequential window processing
In the conversion attribution process, there is a parameter called the attribution window that determines whether a joined conversion-action pair is kept in the final output. A given pair is only kept if the difference in time between the two timestamps is smaller than the given attribution window (e.g. for a 7-day window, the action can’t have happened more than seven days before the conversion). If we find a pair that fits in a small window, we implicitly know it will fit in a larger window, and therefore we do not need to reattribute the conversion for any larger windows. Using this insight, we could first join the conversions to the actions in the smallest supported window. For any event that is attributed, we can add this pair to the final output. All other events are then joined to actions in the next smallest window, and so on. This approach could reduce the maximum amount of data that is shuffled at once since each join is smaller than with the approach of joining all the data at once. I haven’t further explored this approach because it has similar characteristics to the ID join solution mentioned above (i.e., a lower maximum memory requirement, but a corresponding increase in runtime). Furthermore, this solution will introduce a complicated execution plan that is more difficult to troubleshoot.
Test the impact of Bucketing and Delta Processing
After reviewing these options within the team, we decided to focus on the bucketing and delta processing optimizations given their positive results and the fact that they could be implemented together. To quickly assess the feasibility and impact of applying bucketing and delta processing to this Spark pipeline, we built and executed prototype versions of the jobs, resulting in processing the same amount of data while lowering the CPU and memory resources dramatically. The results confirmed our hypothesis that combining bucketing and delta processing could yield a large productivity gain for the Spark pipeline.
Next, we’ll focus on a more direct load test using increased data sizes rather than reduced resources before productionizing the bucketing + delta processing solution. This post covered the optimizations we came up with in a month of work, and it is by no means exhaustive. If you have ideas about how to write more performant Spark pipelines, I’d love to hear from you. Otherwise, we’ll keep you posted with the results of implementing these ideas. Thanks for reading!
This is a collaborative project at Pinterest. Special thanks to the following members of the team: Brent Bryan, Greg Sakorafis, Cole Rottweiler, Andrea Burbank, Eva Zhang, Deirdre Storck and Aaron Yuen for all their contributions on this project.