Salting Your Spark to Scale

Has your Spark job ever crashed due to data skew? Curious to know what that means and explore a way to fix it? Then read on to learn about a solution to deal with data skew called “Salting”.

AppsFlyer has a tech stack that accepts billions of raw events daily, upwards of 80 billion a day, and this data needs to be transformed in order to enable real time querying and analysis of the data. This is made possible by aggregating the data in a defined format and then persisting the data to a distributed database, in this case, Druid. This post will focus on our process for taking raw data and transforming it into more meaningful aggregated data. Basically, how it is prepared for the database — including reducing the number of rows of data, by reducing the cardinality of the dimensions aggregated for each client. (Cardinality: refers to the distinct count of values for a particular dimension.)

This blog-post is a summary of a talk I gave. Check out the you tube recording if this post wets your appetite.

As with all client data, we need to be careful because the dimensions in the data are in fact valuable to our clients. There is no arbitrary way to prune or cut off the data, it needs to be done in a way that is justifiable to our customers. This is achieved in this case by defining a fixed threshold for each dimension per customer — the threshold determines the number of distinct values we will provide each customer in the aggregated data.

This process of ingesting, transforming and ultimately aggregating meaningful data for our customers went through a number of iterations, and required optimization to be able to handle our high cardinality data with good performance.

In our first iteration, we took an approach where all of the data was passed sequentially through a series of window aggregations focused on limiting the cardinality of each dimension separately. Because all the data was passed sequentially through complicated window transformations — there were difficulties with scale. This became increasingly difficult as the raw data continued to grow exponentially, and we realized we needed to break up this process to make it more performant. A good article on this topic is by Michael Spector on the old cardinality process.

Processing & Transforming Data: Grey Lists

In the next iteration, the process was divided into two, where the first part consists of analyzing the raw data to define which dimension values are allowed for each client based on a threshold or hard limit. We call the output of this first calculation a grey list for reasons that will be explained below. We will use number of campaigns per client in this blog post as an example of applying the threshold to a dimension. So the first phase defines, which campaigns are allowed per client as the first pass over the data, where each client can only have 1000 total campaigns.

As an aside, one nice feature about breaking up the process into these two phases, is that this will enable custom metrics/limits to be defined per client in the future.

Our raw data is stored in a columnar format, making it possible to access single dimensions fairly inexpensively from a computational perspective, that is, in terms of time, cost, and even performance. Therefore, it is fairly efficient for us to access the values for individual dimensions out of the raw data in order to define the values allowed per client.

Limiting the number of distinct values for a dimension is necessary in order to prevent a type of denial of service due to common scenarios and bugs often created on the client-side, for example inserting unique IDs as the campaign name into clicks. This type of bug would create a database entry per each click, as each would be associated with a different campaign — where clicks often reach the hundreds of millions per client. Without a predefined threshold there would be explosive data, and the database would crash for all of our clients.

The result of the first phase — the grey list — is a list of allowed campaigns per each client and can be represented conceptually as a hash map as shown below.

{
client_id => Set(allowed campaigns),

}

For clients that exceed the limit, an entry is written in the hash map which indicates which dimension values (campaigns), are allowed. For clients that don’t exceed 1000 distinct campaigns no entry will be added to the list — hence we call it a grey list rather than a white list. Omitting clients that don’t exceed the limit is an optimization to reduce the data storage requirements for the grey list, and is important for a later stage when we apply the grey list limits during the aggregation of the raw data.

This campaign list is ordered by volume of installs per day — and not just sequentially, therefore the best campaigns are included in the list, and those pruned are the less quality campaigns. Essentially this improves the value of the aggregation provided to the client, if the data was simply cut off by time or order, it wouldn’t require any processing or transformation at all, and could be provided in real time. Being able to provide the highest quality data that is queryable in real time, is the critical piece, and the engineering challenge.

The next part of the process when the raw data is taken and the grey lists are applied to the data is now the easier part of the process. Because in the previous step we worked to make the grey list very compact, it is possible to broadcast the list in Spark. This means, that the calculation can be performed without shuffling any data, because the grey list is small enough to be sent as a copy to every node of the cluster. Normally this join would be done via partitioning the raw data by key and then partitioning the grey list by the same key. Because of this data partitioning, all of the raw data would need to be sent across the network. Being able to broadcast the grey list data, made it possible to process the data more quickly and easily, because there is significantly less data movement, which is always an I/O intensive task.

{Exception} — The Breakdown

This worked well for a while, and then it started to unravel. The calculation of the grey list was the piece of the puzzle that instigated the breakdown. The grey list is calculated by gathering the campaigns for each client and prioritizing each client’s campaigns based on the number of installs in each campaign. These collections of all the campaigns per client are called windows in Spark terminology — or in simple terms, a limited data set based on predefined partitioning criteria. Based on these windows, the top threshold values (in our example, 1000 values), are transferred to the grey list. If the number of campaigns in the window did not exceed 1000 however, then nothing is stored in the grey list for this client.

Where this process broke down was when the data per client was extremely skewed. For example, instead of slightly exceeding the hard limit of 1000 values, one of our clients sent 1,000,000 values, and this caused the size of the window to be much too large to hold in one computational node of the cluster. So the job continually crashed and created a bottleneck for the rest of the running jobs.

The most common way to overcome such problems is always to throw more power at it — so initially we expanded the memory allocated for the job. This allowed it to succeed for some time, and then it crashed for the same reasons again (and again), and we came to the realization that we clearly needed to fix the underlying problem.

Enter Salting

Fixing the data skew problem required salting the data sets — meaning adding randomization to the data to allow it to be distributed more evenly. It also required two-stages of window calculations to compute the grey lists.

In the first window calculation a random value is injected by adding a column to the data frame with a randomly generated number.

The salt column is simply a uniformly distributed random integer 0-nSaltBins (where nSaltBins is the number of window partitions you would like to break disproportionately large windows into). In our case we use a value of 100.

Now when we partition data for the first window, it only holds the values where the random integer is the same, and the result is that the window with the 1,000,000 values is then divided by 100 (i.e. 1M / 100). This serves as the way to break up the very large windows into smaller window partitions that are hostable and processable on a single node, and the running job can then succeed.

First phase of calculation —

After this phase however, we have limited the number of campaigns in the salted window, but we still need to limit the number of campaigns in an unsalted window. That’s the second phase of the process. Another window calculation is performed where the random integer is dropped, and the top 1000 items is recalculated and retained in the final data set.

Computing the grey lists with salting scales better because the number of values per window in the second phase is now limited to the number of salt bins (in our case 100) — times the threshold of items per window (in this example 100 x 1000). This now outputs 100,000 values, which is manageable by your typical Spark executor, whereas the previous process required the executor to manage 1,000,000 or more values, creating the bottleneck and I/O strain.

The most interesting aspect of this process is that although a lot of more work is actually being performed, in terms of CPU clock cycles and network I/O, it has been divided into smaller chunks. As a result the computation is now more evenly distributed across the cluster, enabling improved performance .

(And just another reminder…if you like data science, we’re always hiring).