Credit: Shutterstock

Bootstrapping with Spark

Paul Cho
Udemy Tech Blog
Published in
7 min readSep 5, 2018

--

In the last blog post, Robert described why we bootstrap our experimentation data. While that may or may not have convinced you of the merits of bootstrapping, another dimension to consider is its computational cost.

Bootstrapping is a computationally very expensive method to use for analysis. That cost is massively amplified when dealing with the huge scale of online experiments. In this post, we’ll describe our approach to overcoming the technical challenges of bootstrapping in our own experiment analysis pipeline using Apache Spark.

Simple Bootstrapping

To get a sense of what bootstrapping looks like in code, let’s look at a simple implementation at a smaller scale. We will focus on bootstrapping a single statistic — namely, the mean.

- Set m, the number of resamples you want
- Calculate n, the size of the dataset
- For 1 to m:
- For 1 to n:
- Randomly sample 1 value from the dataset
- Add this value to an accumulating sum
- Divide the final sum by n to compute the mean

Here’s how it looks in Scala:

Here we have the variable data, an Array[Double] containing a random sample from some population. The result, bootstraps, will be our approximation of the sampling distribution of the mean. Note that even for this tiny dataset, the number of times drawAndSumValues is called is n x m, or 70,000 in the above code. When we do this for very large datasets, we will very quickly run into performance problems. Generating 10,000 bootstrap means for a large dataset will take a very long time. Furthermore, datasets at the scale of online experimentation are often so large that they do not even fit in memory.

Scaling with Spark?

Because this generally seemed like a big data problem, we set out to tackle it with Apache Spark. But we quickly realized an issue with the way Spark is typically used. Spark’s Resilient Distributed Dataset partitions your data into disjoint subsets and distributes them across the nodes in your cluster. You might think that you can simply distribute the work of sampling across the executors, so that if you have x partitions, you can sample n/x times from each partition, and sum those up using reduce to generate a mean. The problem with this procedure is that it does not count as actually bootstrapping. In bootstrapping, each draw should be completely independent. A way to prove to yourself the non-independence of this approach is to see that it is impossible to draw the exact same value all n times. If the draws were truly independent, this should be possible (however unlikely it may be).

One possibility we discussed for generating a single bootstrap mean was:

  1. Index the dataset
  2. Partition the dataset so that the number of partitions matches the number of executors
  3. Set the same seed in all executors
  4. On each executor, randomly sample indices n times, but only draw the values that are in the range of the indices of the data available to that executor and sum them up
  5. Reduce the sums into a single sum
  6. Compute the mean

This method would enable us to generate a real bootstrap resample (i.e., where each draw is independent) from a very large dataset that won’t fit in memory. However, with this approach, every executor has to generate all n indices for a single bootstrap resample, and we would still have to generate all 10,000 bootstrap means serially.

Another approach, which is used in Spark’s implementation of sampling, is Poisson sampling, but this method does not guarantee a resample of size n and would also still require generating the 10,000 resamples serially.

Parallelizing with Spark

In order to generate all 10,000 resamples more quickly, we considered using Spark as a parallel execution engine so that we could generate the bootstrap resamples in parallel. This would require us to broadcast the dataset to all the executors.

Let’s create a higher-order function to help us run a function in parallel on broadcast data. We’ll then use this function to run our simple bootstrapping code in parallel.

So, now we’ve parallelized our resampling and computation of means to improve performance. For experiments whose data is able to fit in memory, this is great. But what about larger experiments with tens of millions of data points that do not fit in memory?

Compressing Our Data

One thing we realized was that the cardinality of our data was very low relative to the total size of the dataset itself, and the vast majority of data points were 0's. We took advantage of this by constructing a frequency distribution of the unique values of the dataset and sampling from this frequency distribution. This would effectively compress the dataset so that it can fit into memory.

We’ll use the following made-up sample data to illustrate. Here, we have the first 15 rows of variant data that we want to bootstrap.

+---------+--------+
| Visitor | Metric |
+---------+--------+
| 1 | 0 |
| 2 | 0 |
| 3 | 30.5 |
| 4 | 0 |
| 5 | 0 |
| 6 | 0 |
| 7 | 0 |
| 8 | 20.2 |
| 9 | 0 |
| 10 | 0 |
| 11 | 0 |
| 13 | 0 |
| 14 | 0 |
| 15 | 70.1 |
| ... | ... |
+---------+--------+

To compress the data set and sample, we will (1) build a frequency distribution, (2) build a cumulative frequency distribution, and (3) sample from this cumulative frequency distribution.

Frequencies

First, transform the original dataset into a dataset of unique values and the number of times they each occur in the dataset, ordering this dataset by the number of counts in descending order.

The resulting dataset now represents a frequency distribution.

+-------+-----------+
| value | frequency |
+-------+-----------+
| 0 | 12050 |
| 35.3 | 1025 |
| 10.8 | 930 |
| 30.5 | 810 |
| 20.2 | 795 |
| ... | ... |
+-------+-----------+

Cumulative Frequencies

Next, construct a cumulative frequency distribution:

We now have cumulativeFrequencies, a List[CumulativeFrequency], where upper is the upper bound of the range of indices that represent the corresponding value. The upper of the last item of this list will be the total size of the original dataset. The length of this list is the number of unique values.

+-------+-------+
| value | upper |
+-------+-------+
| 0 | 12050 |
| 35.3 | 13075 |
| 10.8 | 14005 |
| 30.5 | 14815 |
| 20.2 | 15610 |
| ... | ... |
+-------+-------+

Sample

Next, sample from this cumulative frequency distribution with the following steps:

  • Call scala.util.Random.nextInt(n) to generate a random index
  • Retrieve the original value by traversing the List of cumulative frequencies, checking if the random index is less than upper. When the index is less than upper, return it.

Note, this is essentially inverse transform sampling.

Bootstrap in parallel

We can now use these functions to generate 10,000 bootstrap means from our transformed dataset in parallel.

Performance

The performance of this method varies widely according to the characteristics of the data (i.e., the number of unique values in the data and the distribution of those unique values). It is best when the frequency distribution of unique values looks something like this:

(the unique values are ranked by frequency)

When we sample from a dataset like this, the probability that we need to traverse the entire list is very small, and most of the time we’re only retrieving values on the far-left side. The wider and flatter that plot gets (i.e., as the number of unique values increases and their frequencies are more uniform), the more the benefits of this approach diminish. The shape of the plot will depend on the metric of interest in the experiment, the traffic pattern and volume of the page being experimented on, and the nature of the business itself. In our case, it turns out that higher traffic experiments generally look more like the plot shown above.

We were able to compress our experimentation data by factors of around 20 and up to 1,000 for very high traffic experiments (with tens of millions of data points). The speed of retrieving a value from the cumulative frequency distribution compared to a simple array lookup by index also depends on the distribution of unique values and is roughly proportional to the amount of compression achieved. It is considerably slower for lower traffic experiments (~50x slower), but this decrease is smaller for higher traffic experiments (~7x slower). The crucial point, however, is that we are now able to generate the bootstrap resamples in parallel for large experiments that would not otherwise fit in memory.

Conclusion

We have used this method of sampling in the the first iteration of our experiment analysis pipeline. As shown above, its performance is highly dependent on the characteristics of the data. As we continue to iterate, we hope to further optimize our pipeline by using the most efficient sampling method for different sizes of experiments as well as parallelizing at the multi-experiment level.

--

--