Profiling Dataflow Pipelines

Sameer Abhyankar
Google Cloud - Community
6 min readMay 1, 2019

Update Feb ’22: Dataflow now supports profiling with general availability — See documentation here!

Google Cloud Dataflow is a fully managed service for executing batch and streaming data pipelines written using the Apache Beam SDK. In addition to managing and orchestrating the end-to-end pipeline execution for the users, Dataflow provides a number of out of the box features such as Auto-scaling, Dynamic Work Re-balancing (aka Liquid Sharding) and native Stackdriver integration that further enhances Dataflow’s zero-knobs story. This makes Dataflow an ideal execution platform for transforming and enriching data on the Google Cloud Platform.

Great — What exactly is a Dataflow pipeline?

A Dataflow pipeline can be described as a series of transforms that is applied to a distributed collection of data elements as these elements move from source to target. These transforms are generally provided by the authors of the pipeline (aka user-code) and describe business rules that is applied to the data elements to get them into a usable form.

As such, the overall performance of the data pipeline is highly dependent on the performance of the user-code behind these transforms.

How many times have you heard someone complain — “My job is running slow”?

Probably way more than we care to count!!

OK — So the pipeline is running slow. How can I troubleshoot it?

Dataflow and the Apache Beam SDK provides a number of facilities in the form of metrics and integrated Stackdriver logging to help troubleshoot pipelines. However, these might not always be sufficient due to various reasons:

  • Prone to trial-and-error as the author attempts to add verbose logging and metrics to identify the bottleneck.
  • Excessive logging adds its own overhead to the pipeline performance.
  • Metrics might be relevant where the user code interacts with the Beam model. However, the user code in the bottleneck step might still be opaque to the users.

This is where Dataflow Profiling can help.

Dataflow profiling is a process that allows us to get more insights about the pipelines execution at the granularity of the user-code. In other words, profiling helps us double-click into the actual user code that is executing in the transforms that make up the pipeline.

Fantastic — So how do we profile a Dataflow pipeline?

One of the easier ways to profile a Dataflow pipeline is to leverage the native integration that Dataflow has with Stackdriver Profiler. Stackdriver Profiler visualizes the call hierarchy and resource consumption of the pipeline in an interactive flame graph that can be used to analyze performance.

Flame graph in Stackdriver Profiler visualizing a Dataflow pipeline call stack

Dataflow’s native integration with Stackdriver profiler makes it extremely easy to enable profiling for any Dataflow job.

For a pipeline written using Apache Beam’s Java SDK, profiling can be enabled by passing an additional parameter that accepts a Map of properties:

mvn clean compile exec:java \
-Dexec.mainClass=MyClass\
-Dexec.args=” \
— runner=DataflowRunner \
— project=my-project \
— jobName=my-job \
— stagingLocation=gs://my-staging-bucket \
— tempLocation=gs://my-temp-bucket \
……. <<other pipeline specific flags>> …..
--profilingAgentConfiguration=’{ \”APICurated\”: true }’

Note: Do not set — saveProfilesToGcs=<gcs-bucket-for-profiles> when the above configuration is used as this will disable profiling!

Adding the boolean property APICurated set to true as a Map input to the profilingAgentConfiguration property is all that is needed to enable profiling for the pipeline.

Once the pipeline starts executing, browse over to the Google Cloud Console, click on the Navigation Menu and the Profiler feature.

Then click on the service drop-down to search for the Dataflow job that we want to analyze:

Awesome — We identified and fixed the bottleneck. Is there an easy way to compare the performance before and after the changes?

Absolutely! Stackdriver Profiler makes it super simple to compare jobs. Just execute the job with the same jobName and use Stackdriver Profiler’s “Compare to” drop-down to pick a different version (i.e. job_id).

Are there any other profiling knobs that we can leverage?

Yes! The profiling engine used by Dataflow exposes a number of other additional knobs:

APICurated = Boolean flag to enable profiling // Default: false
Interval = Profile output frequency // Default: 60 secs
Duration = Trace duration per interval // Default: 10 secs
Delay = Delay tracing by seconds // Default: 0 secs
MaxCounts = Max number of profiles collected // Default: unlimited
NativeStacks = Boolean flag to enable native stack unwind // Default: false

However, setting just the APICurated flag to true should be the only change needed in most cases.

Are there any other profiling tools?

Sure! While Stackdriver Profiler is very easy to use given the native integration, there might be a situation where a deeper dive into the execution call tree is needed. In these rare cases, we could leverage pprof which is an open source tool for visualizing and analyzing profile data. Pprof analyzes profile data that is dumped by Dataflow pipelines and has several features that do not currently exist in Stackdriver Profiler. These include:

  • Graph view (pprof -graphviz)
  • Tree view (pprof -graphviz -call_tree)
  • Top-Down/Bottom-Up tree view
  • Additional output formats (pdf etc.)

How do I extract the profile information needed by pprof?

This is pretty straightforward and is done by passing an additional property to the command line while executing the pipeline. So for example, a Dataflow pipeline written using Apache Beam’s Java SDK can dump the profiles to GCS by passing a property — saveProfilesToGcs as follows:

mvn clean compile exec:java \
-Dexec.mainClass=MyClass\
-Dexec.args=” \
— runner=DataflowRunner \
— project=my-project \
— jobName=my-job \
— stagingLocation=gs://my-staging-bucket \
— tempLocation=gs://my-temp-bucket \
……. <<other pipeline specific flags>> …..
--saveProfilesToGcs=gs://YOUR BUCKET HERE/profiler

That’s all! Once the job starts executing, the Dataflow service will write profile files (*wall* and *cpu*) files at regular intervals (approx 1 file each for cpu and wall times every 60 seconds)

Great — How do I analyze these files using pprof?

Once enough profile information (approx 10 mins) is collected or the job finishes (in case it is a batch pipeline), download the profile files locally and use pprof to visualize the cpu or wall time profiles. Pprof has a number of command line options to assist with the analysis. For e.g.:

pprof --call_tree --web “http:” <LOCAL PATH FOR PROFILES>/*cpu*

While there are a lot more details about using each of these tools that we could potentially talk about, this overview should hopefully introduce everyone to a couple of very useful troubleshooting and performance tuning tools that should be part of every Dataflow pipeline developers toolkit.

Additional Details and Notes:

While this overview only covers profiling from a CPU consumption perspective, we hope to do a follow up blog that describes the process of profiling memory utilization for the Dataflow pipelines. Memory profiling is currently not integrated into Stackdriver Profiler and is more of a manual process.

--

--