An Optimization Story: Speeding Data Locker Up

Виталий Савкин
AppsFlyer Engineering
8 min readDec 19, 2021

A few months ago, we at the Data Locker team made significant improvements to our product stability, cost, and data delivery lag reduction. Our data delivery lag is one of the most important properties of our product; Data Locker’s entire purpose is to make data available for our clients.

While stability improvements were the most noticeable to our clients, they are mostly connected to the scheduling of our ETL apps and not particularly interesting to a wider audience. But performance improvements that led to lag reduction and cost optimization required insights into how the data processing engine we use (Apache Spark) works.

Before diving into the technical details, however, of how we did this, I will give a brief overview of what Data Locker is, and the challenges surrounding it.

The Challenge

Clients send data to AppsFlyer (AF) from devices all around the world to let our engine perform mobile attribution and then see the aggregated results on dashboards. We store this data in a single place called a data lake.

For some clients, aggregated data displayed on dashboards is enough, but many also want to see so-called raw data — meaning the details of each event related to their users. For this purpose, AF offers a few products and Data Locker is one of them — generating reports in the form of files on S3 or GCS on an hourly basis.

The Data Locker algorithm:

  1. Reads data of the specified client applications from the data lake.
  2. Applies user-defined and governance-related (e.g. GDPR) filters and transformations.
  3. Stores the resulting dataset to a client directory in a distributed file system as CSV files. Files in this dataset should be even-sized and have no more than 10000 rows.

The Naive Solution

The first trade-off when building such an ETL application is to decide whether we want to read each client’s data separately or read everything in one go.

The app per client approach has its own advantages (it’s more scalable and provides better isolation), but is much less cost efficient. If a source file contains the data of N clients, Spark reads it N times (and this happens for each file). Since source data is not grouped or sorted by client (only sorted by app ID), and Data Locker is used by hundreds of clients having tens of thousands of applications, this overhead quickly becomes unmanageable.

It was therefore decided to read all of the clients’ data in one go, persist it in memory and then for each client:

  1. Create a client dataframe through filtering a cached dataframe by the client’s applications.
  2. Count rows of the client dataframe and calculate the number of output files based on the count.
  3. Repartition the client dataframe to the number of partitions equal to the number of output files.
  4. Save the repartitioned dataframe to the client directory (due to the repartitioning we did in the previous step, the output will have as many files as we need).

There is an alternative approach based on Spark’s ability to limit the number of output records, but it has its own disadvantages. For example, it can produce files of very different sizes, which we want to avoid.

It is important to note that this solution had some issues:

  • For bigger tables during peak hours, processing an hourly partition of data sometimes took up to 90 minutes.
  • Cluster CPU utilization was 40–50% when all cores were occupied.
  • We saw accidental, usually irreproducible OOM exceptions happening on executors. These OOMs killed executors, slowed down applications, and then chained together and finally crashed the entire application or caused it to be killed by timeout. In this case, processing one hour of data took a few hours and a great deal of computational resources.

Optimistic Writes to the Rescue

The most obvious inefficiency in the naive approach is that it executes filtration by client apps and user filters twice for each client — once during the count step and again when saving data. This filtration is quite CPU-intensive, particularly because it visits each row in the cached dataset (which contains all clients’ data).

But do we need the exact number of rows to calculate the number of output files? Can we get it without applying filters and calling the count on the filtered dataframe? Well, yes — sort of. We can get the upper bound of this value by summing row counts for each client application. The code df.groupby(app_id).agg(count) takes about a minute on our volume (up to 2B rows for the largest tables), thanks to the parquet format and the fact that data in our data lake is optimized for reading by application ID.

So we have a number, and further research shows that for 99% of the clients it is equal to or differs very little from the real value.

There are cases when this approach generates an unreasonable number of very small files. The solution for this is quite straightforward: we can write the number of files based on the estimated upper bound, and if we write too few rows, we can re-generate the report.

But how do we know how many rows are actually written? The obvious answer is to run count on the written directory. While this is more efficient than to execute a count of the filtered dataframe, it also defeats the initial idea of double count elimination.

This is where Spark Accumulators can help. Using this feature, we can write data and count the written rows simultaneously — without any noticeable performance impact!

Here is the code:

def withRowsCounter(
df: DataFrame, client: String): (DataFrame, LongAccumulator) = {
val counter = df.sparkSession.sparkContext
.longAccumulator(s”rows-counter-for-$client”)
val countUdf = udf { () => counter.add(1); null }
val column = “triggerCounter”
(df.withColumn(column, countUdf())
.filter(s”$column IS NULL”)
.drop(column),
counter)
}

This function returns two values: dataframe and accumulator. When the save action called on the returned dataframe is completed, the number of written rows is available in the Accumulator.

So, having both estimated the upper bound and real number of rows, we can fix cases when our optimistic writes fail us. When we adopt this approach, we notice two things:

  • Applications complete approximately 20% faster.
  • CPU utilization drops from 40–50% to < 5% most of the time, with occasional peaks of 70–80%.

Who Ate My CPU Time?

Let’s now see how Spark executes our application.

The caching step creates a dataframe with a number of partitions calculated in a non-trivial-way. In our case for the above-mentioned hour, the number of partitions is 2,000–3,000. On the first action called on this dataframe, data is actually being read from the data lake and is stored to memory.

The repartitioning step is actually a shuffle. In short (you can read more about how it works here), each input (mapper-side) partition prepares a data file to be read by the output partition (reducer side). This file is sorted by reducer partition ID to speed up data access. Each reducer then contacts each mapper to get data.

Can this be an issue? Yes, many companies have faced it and even developed special solutions to combat it. The problem is that each connection transfers too little data, and sometimes a reducer contacts a mapper just to see that this mapper has no data for it.

Our theory now is that the number of input partitions is too big and slows down the application by running many IO operations that do not utilize the CPU. So how can we use it?

Two variables in the equation are fixed — the number of clients and the number output files per client. We can influence only the number of partition reducers for each client to read data from. With a caching approach we likely cannot limit it.

Even if we somehow manage to reduce the overall number of partitions, mappers still have to know which partitions have data for them and which do not. A client’s data can still be located in too many partitions, and reducing it too much (via coalesce(2) for instance), will hurt scalability. We need to group the data somehow, and a good way to do it is via directories. We can also save each client’s data to its own S3 directory, with this code:

// app2client is a dataframe telling us
// which client each application belongs to
df.join(broadcast(app2client), “app_id”)
.write
.partitionBy(“client”)
.parquet(s3CachePath)

With this code, we saw that we started to get a lot of S3 SlowDown exceptions when writing the intermediate data. This is because Spark writes an output file for each dataframe partition if the partition has at least one row for the output partition.

Spark created around 750,000 (300 x 2500) files. S3 cannot and should not, however, handle that much in a reasonable amount of time. We should somehow group the client’s data inside the input dataframe, so that only a small fraction of input partitions produces data to a given output directory.

This can be solved by using the Spark function repartitionByRange:

val cacheFilesNumber =
Math.max(1, totalEstimatedCount / 500000).toInt
df.join(broadcast(app2client), “app_id”)
// see why we need __randomize__ column here
.withColumn(“__randomize__”, rand(0))
.repartitionByRange(
cacheFilesNumber, col(“client”), col(“__randomize__”))
.drop(“__randomize__”)
.write
.partitionBy(“client”)
.parquet(s3CachePath.toString)

This code turns the partitioning process into the following:

Now we have partitioned client data and when we want to process a client, we can just read the data from its directory. The number of mapper-reducer requests becomes much lower, and each request carries as many rows as possible.

This approach has its own overhead, however: the S3 cache should be generated beforehand, and since clients’ data does not reside in memory any more, only practice can show whether it is worth adoption. The practice in fact did show approximately five times lower execution time and ~8 times lower CPU time compared to the original solution. Other tests and production usage also confirmed excellent performance and scalability, and much lower memory requirements. Lower memory consumption allowed us to save even more cost by using cheaper EC2 instances, and resulted in the absence of OOM errors.

Final Thoughts

When working with Data Locker or with any other system, understanding the internal workings of that system pays off. Sometimes being clever justifies the additional complexity. Just be sure to make the right trade-off as well.

Special thanks to Itamar Benjamin and Nick Grigoriev for reviewing this post and productive technical discussions.

Appendix: Comparative Table of All Optimizations

--

--