Beam Me Up — Profiling a Beam-over-Spark Application

PayPal Tech Blog Team
The PayPal Technology Blog
7 min readMay 30, 2017

As we move forward with adopting Apache Beam for some of our streaming needs, our Beam applications need to be tested for stability. Such tests are aimed at ensuring performance does not degrade over time, and applications are capable of maintaining desired performance characteristics (e.g., latency) as they run over long periods of time. When we ran a Beam-over-Spark application (Beam 0.7.0-SNAPSHOT ; Spark 1.6.2) for a period of several hours, the batch processing time was increasing unexpectedly (e.g., regardless of traffic seasonality).

In this post we share the steps and methods we used to diagnose the performance degradation we witnessed in our application’s (batch) processing time, a diagnosis which ultimately led to identifying the root cause and resolving it.

Houston, We Have a Problem

One of the key metrics in Spark is the lastCompletedBatch_processingDelay, which indicates the time required to process a batch of events. If the batch processing duration exceeds the time gap between consecutive batches (as configured by the batch duration parameter), the application can no longer keep up with the input rate and a backlog of pending batches is formed. If the backlog keeps growing, at some point the application is likely to run out of memory and crash.

Profiling

Diving In

Since a Beam-over-Spark application consists of multiple layers, we first sought to narrow down the search space. We had therefore eliminated any non-trivial beam operators and IO operations such as Kafka reads and writes, and remained with an identity-map-like pipeline over an in-memory source which generated a stream of integers produced by a Beam’s GenerateSequence construct.

If the performance problem was to persist even in the simplified, stripped down, application, it would likely indicate an issue in one of the fundamental layers rather than in our application’s code (which at that point was reduced to a trivial identity-map-like transformation).

Stack

Interestingly enough, the problem did indeed persist even in the stripped down version of the application.

Spark UI

We next turned to the Spark UI to gather basic information.

Among the task metrics, task deserialization times took the longest, ranging from 13 to 19 ms. However, at this point in time it was not clear if this had anything to do with the issues we were seeing.

Profiling

A second sampling taken some hours later, showed that task deserialization times kept growing and reached 41–79 ms, while other task metrics remained stable and did not show signs of degradation — task deserialization became the prime suspect.

Profiling

Two main causes could account for the increase in task deserialization times:

  1. Task size grows and the system needs to deserialize more data.
  2. The manner in which deserialization takes place adversely affects deserialization times, and takes longer over time.

Looking through the driver’s logs, tasks’ size remained within a stable range of a few kilobytes, indicating that the extra time was not being taken up by larger objects getting deserialized, but probably had more to do with how the deserialization was being performed, and less with what was being deserialized.

CPU Profiling

Flame graphs are a known method for profiling applications’ performance. A recent post by our very own Aviem Zur goes into details and explains how flame graphs can be leveraged to profile Spark applications.

We used flame graphs to profile our application’s driver and executor JVMs (not at the same time, to avoid collisions at the db level).
Driver profiling was performed using:

"spark.driver.extraJavaOptions=-javaagent:statsd-jvm-profiler-…"

Executor profiling is performed using:

"spark.executor.extraJavaOptions=-javaagent:statsd-jvm-profiler-…"

While there did not seem to be an apparent bottleneck which could account for the performance degradation we had been seeing, the TorrentBroadcast.readBroadcastBlock() method caught our eye since we could not easily explain why reading a block would take what we considered a relatively long time.

Tracking down bottlenecks

We also tried comparing two flame graphs of the same component as suggested in Brendan Gregg’s Blog. The first flame graph was generated close to the application’s start time, and the second shortly after the processing delay grew considerably. However, the diff flame graph did not reveal new insights. At this point it seemed like looking deeper into TorrentBroadcast.readBroadcastBlock() could be beneficial.

Spark Logs

We then turned to the logs to search for potential issues in the TorrentBroadcast.readBroadcastBlock() method. To make log reading easier and avoid the logs being written to by multiple CPUs on the same executor, we changed the spark executor (and driver) thread count to 1.

The logs showed evidence indicating that broadcast variable reading times were growing:

DEBUG BlockManager: Getting local block broadcast_691
INFO TorrentBroadcast: Reading broadcast variable 691 took 12 ms

Fast forward into the future:

DEBUG BlockManager: Getting local block broadcast_211582
INFO TorrentBroadcast: Reading broadcast variable 211582 took 31 ms

Since broadcast variables are used by the spark driver to transfer tasks to the executor nodes, this seemed quite relevant to the problem at hand. We decided to explore the (task) broadcast deserialization flow further, and look for possible delays.

23:55:04,152 DEBUG BlockManager: Getting local block broadcast_211582

23:55:04,161 TRACE BlockManager: Put for block broadcast_211582_piece0 took 0 ms to get into synchronized block
23:55:04,181 INFO MemoryStore: Block broadcast_211582_piece0 stored as bytes in memory (estimated size 6.7 KB, free 3.3 MB)

23:55:04,183 DEBUG BlockManager: Putting block broadcast_211582_piece0 without replication took 22 ms
23:55:04,183 INFO TorrentBroadcast: Reading broadcast variable 211582 took 31 ms

We noticed that there was a time gap of over 181–161 = 20 ms, ~64% of the total time (!), between the following two consecutive printouts:

23:55:04,161 TRACE BlockManager: Put for block broadcast_211582_piece0 took 0 ms to get into synchronized block
23:55:04,181 INFO MemoryStore: Block broadcast_211582_piece0 stored as bytes in memory (estimated size 6.7 KB, free 3.3 MB)

In terms of code, seemingly there was not much happening between these two printouts, which led to ~64% of the deserialization time being unaccounted for.

A closer look at the second printout (MemoryStore.scala) revealed some interesting insights:

logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId,
Utils.bytesToString(size),
Utils.bytesToString(blocksMemoryUsed)))

Where blocksMemoryUsed is defined as follows:

private def blocksMemoryUsed: Long = memoryManager.synchronized {
memoryUsed - currentUnrollMemory
}

and currentUnrollMemory is defined as follows:

def currentUnrollMemory: Long = memoryManager.synchronized {
unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
}

This basically translates into the unrollMemoryMap and pendingUnrollMemoryMap being scanned every time this particular log line is printed out. If these maps were to grow to extreme sizes, that could definitely explain the delays we were seeing, moreover, that could account for the over-time degradation aspect of our issue, since the delays would grow along with the maps' size.

We briefly considered raising the log level to avoid that particular printout, but a quick usage search identified more usages of currentUnrollMemory in places other than log printouts, making such a workaround ineffective.

To further establish our hypothesis and see if one of the above mentioned map data structures (or both) did indeed grow large — enter memory dumps.

Memory Dumps

A memory dump of one of the executors close to when the application started looked like so:

Memory Dump

A memory dump of the same executor some hours later looked like so:

Memory Dump

The unrollMemoryMap grew from 3477 to 145,555 entries, bringing the scala.collection.mutable.DefaultEntry overall instance count to 145,596.

Memory Dump

Having this information at our disposal, we started sifting through Spark’s JIRA.

SPARK-17465 described the phenomena we were witnessing and suggested it was fixed in version 1.6.3, 2.0.1, 2.1.0, where the purging of the unrollMemoryMap and pendingUnrollMemoryMap maps was fixed.

Running our application with Spark 1.6.3 for a few days verified that the issue had indeed been resolved.

Epilogue

The resolution to our performance issue boiled down to upgrading Spark from 1.6.2 to 1.6.3 (a Beam spark-runner for Spark 2.x is WIP).

We hope that the methods described in this post can help others dealing with performance issues of a similar nature and inspire the community to share their stories from the trenches.

In addition, as part of our profiling session we filed and addressed the following Apache Beam tickets:

--

--