Profiling Apache Beam Python pipelines

Israel Herraiz
Google Cloud - Community
10 min readNov 30, 2020
Photo by Veri Ivanova on Unsplash

UPDATE (January 2022): If you are running on Cloud Dataflow, it has now builtin support for using the Google Cloud Profiler with Python pipelines. I strongly recommend trying out that if you are using Dataflow, rather than following the instructions given here. Find more details at https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline#python

Often the first version of our Apache Beam pipelines do not perform as well as we would like, and sometimes it is not so obvious to find the places where we could optimize performance; sometimes it will be a function parsing JSON, some others the bottleneck will be a external source or sink, or we have a very hot key and we are trying to group by key. On Google Cloud Platform, these are the kind of situations that we could easily detect combining Dataflow with Cloud Profiler (formerly known as Stackdriver Profiler) for pipelines written in Java. But what if we want to profile a Python pipeline?

It is possible to obtain profiles for Python pipelines running on Dataflow, but the output will not be integrated with Cloud Profiler. Still, you can use the profiles with some other tools to identify bottlenecks. In this post, we explain how to obtain the profiling data and how to process it to identify bottlenecks in the code our Python pipeline.

The profiler included with Apache Beam produces a file with profiling information for each data bundle processed by the pipeline.

The class ProfilingOptions contains all the options that we can use for profiling Python pipelines: profile_cpu, profile_memory, profile_location and profile_sample_rate.

The first two, profile_cpu and profile_memory are boolean flags, and are used to obtain data about CPU and memory consumption.

The option profile_sample_rate is optional should be a number between 0 and 1. It is the ratio of data bundles that are profiled. The default value is 1.

The profile_location option sets the output directory for all the files produced by the profiler. It should be set to a location in Google Cloud Storage (GCS).

Obtaining some profile data from our Python pipeline with Dataflow

For instance, to have some CPU profiling data, you should add the following options when launching the pipeline:

--profile_cpu \
--profile_location=’gs://<SOME_PATH_IN_GCS>’

The tools we are going to use here will work with CPU time data, but not with memory data. So we will use only the option profile_cpu.

After launching the pipeline, Dataflow should start writing files in the location in GCS:

$ gsutil ls <SOME_PATH_IN_GCS>
gs://<SOME_PATH_IN_GCS>/2020–11–21_19_15_10-process_bundle-4005
gs://<SOME_PATH_IN_GCS>/2020–11–21_19_15_10-process_bundle-4006
gs://<SOME_PATH_IN_GCS>/2020–11–21_19_15_10-process_bundle-4008
[...]

Beware: don't run the pipeline for too long, or you will generate millions of files in that location. For identifying potential bottlenecks in your pipeline, you should not to run your pipeline for longer than some minutes. One hour of running should be more than enough (of course, provided that you have a representative sample of data processed by the pipeline).

Once you have produced some files, you can download them to a local destination, so you can work with them with the analysis tools. Here we download all the data to the profiling directory, and we will assume the profiling data is located there in the rest of this post.

gsutil -m cp "gs://<SOME_PATH_IN_GCS>/*" profiling/

If you kill the pipeline to stop producing more files, you may have downloaded some empty tmp files. We need to remove them before attempting to process them:

rm profiling/*tmp*

How can I interpret all that output produced by the profiler?

To parse and analyze the output of the profiler, you can now use the pstats module of Python. With that module, we can read the profiling data, aggregate and sort it, and produce a smaller profiling output file, that will make the detailed analysis faster.

With the following script you can import the downloaded data, and generate the summarized output. Here we sort by 'cumulative', which is the total accumulated CPU time spent in that function:

After running this script, you will have a file named dataflow.prof that you can use to investigate the CPU and memory consumption of your pipeline.

That file is much smaller than the raw data you have downloaded from GCS. For instance, for this post I generated around 1.2 GB of data, and the output file had a size of 427 KB.

We can now investigate this file dataflow.prof with different tools. There are tools such as gprof2dot, tuna, or snakeviz that allow you to obtain a graph with the profiling information, and in some cases also explore the graph interactively. However, due to the large amount of irrelevant function calls included in those graphs, it is difficult to make sense of the profiling data with those graphs.

So in this post we are going to use Python to try to filter the profiling data, and find the spots that are more relevant to identify points where we could improve the performance of our pipeline.

Let's start by reading the file that we have generated, and printing some overall stats. The script is very simple:

The output will be something similar to this:

Profiling statistics for the code of our pipeline

CPU time is only available at function level. On the right side, we see the data of the function: filename, line where the function starts, and between parentheses the function name.

On the left side we can see the number of times that function has been called, the cumulative time spent in that function, and the average time (cumtime percall) spent in that function.

What are the differences between tottime and cumtime ? This difference is important and will determine how we have to analyze the output. On one side, tottime only counts the time spent in code inside the function, but it is not counting any time spent in other functions called from the code inside the function.

On the other side, cumtime is more relevant to the way we are going to analyze code. It includes all the time spent inside the function, including also any call to any other function. This will help us to find bottlenecks in our code, even if those bottlenecks are due to calls to libraries that are not our code. This is probably the kind of information we want when profiling a data pipeline. We are not interested only on algorithmic complexity, but also in any additional time incurred by calling other functions, in situations like connecting to services, parsing messages, etc. Also, if for some reason one of our functions is recursive, we want to know the total spent in the function from the first recursive call, not the individual times of each one of the recursive calls. Recursive functions can be handy but also a performance hog. With tottime we would be unable to detect those performance hogs with recursive functions; we need to use cumtime to find the places where our code is consuming most of the CPU.

A noticeable aspect of the stats is that most of the functions are related to the Beam SDK, to other libraries, or Python itself. This is normal, as most of the code that is executed is not our code. If you try to find your code in that list of files, you will notice that it is actually located in a subdirectory in /usr/local/lib/python3.8/site-packages/ . This is because your code is installed as just another Python module when the worker is created. You need to keep the name of any module that you have defined in your code, to be able to filter them out from the rest of Python modules installed in the Dataflow worker.

Finding the parts of our code consuming most of the CPU

The difference between tottime and cumtime suddenly becomes more relevant. The amount of noise and external files in the stats is staggering. If we use cumtime we can perfectly focus on our code alone. If there is any external call that is causing the performance problems, it will be counted in cumtime .

How can we filter for our code? The stats object contains a dictionary that we can use to filter out our code. In Python, we can get the output of p.sort_stats('cumulative') as a dictionary (Python code below):

stats_dict = p.sort_stats('cumulative').stats 

The keys of this dictionary are tuples with the filename, line of code, and function name. If we know the name of our modules, we can extract the keys of our modules. For instance, let's imagine that in our code we had two modules named dofns and pipeline ; we would like to extract the lines about our modules, and sort the output by the average time used in those functions. We could do something like the following (using the stats_dict extracted in the previous step):

In my case, I have obtained the following output:

Output after filtering and sorting by average time

In our case, we see that the most of the time spent by our code is a DoFn found in the file business_rules.py . The first result is not surprising: most of the time is spent in the process method of that DoFn . The second results is more interesting: inside that process method we are parsing timestamps, and it seems that about half of the processing time is dedicated to that parsing process. Perhaps we can try to improve that parsing process.

The example above highlights the importance of a good design of our pipelines: we should divide our code in small functions, that will help us greatly when we are trying to pinpoint places where we need to improve the performance of our code.

If we would not have defined that _parse_timestamp method in our DoFn class, we could not know why the business_rules DoFn is about 10x slower than the parse_messages DoFn . Thanks to dividing the work in smaller functions, we know that about half of that time is dedicated just to parsing timestamps.

Which external functions is my code calling?

We have identified that our business_rules DoFn is consuming most of the CPU, and that this is due to parsing timestamps. In our code, we are using some external code for that task; maybe is that external library the one causing performance problems. Can we find which calls to external libraries are done from this function and how much are those consuming?

Let's assume that in the previous example, the _parse_timestamp method was in the mykeys list with index 1 ; let's try to find all the functions that were called by mykeys[1] :

In this code snippet, we find all the entries in the stats dictionary that are called by mykeys[1] , which is the _parse_timestamp method, and then we sort the output by average cumtime .

After running the above code snippet, we obtain the following output:

Trying to find if any external library is responsible for the performance hog.

We find two entries that are Apache Beam functions. Because these are related to how Beam works, there is no much we could do about them, so we can just ignore them.

But the first entry is actually very interesting: we are using the dateutil module, which is part of the package python-dateutil. And how much time is spent on average in that module? Bingo! It is 96% of the time (.00129 secs/call) that we are spending in the _parse_timestamp method (.00134 secs/call).

So, in summary, about half of the time being spent in our most expensive DoFn is due to the use of the python-dateutil parser. If we could find an alternative, we could probably greatly reduce the usage of resources of our pipeline.

Conclusions

If your Apache Beam Python pipeline is not performing as good as you expect, you can profile it to find which parts of your code are the consuming most of the CPU, and what are the potential causes for that (such as inefficient external libraries or services).

In Google Cloud Platform, you can just write the profiling output to GCS, and then analyze it using Python scripts.

The Python profiler provides two main metrics that can be used for measuring the CPU usage of your code: tottime and cumtime . Because cumtime also measures the time waiting after calling external libraries or services, it is probably the metric that is more interesting for our profiling. By focusing on cumtime we can just filter out everything that is not our code, and then sort our code by cumtime .

The examples in this post show the importance of a good pipeline design. Our code should be divided in small functions. Very often, most of the time will be spent inside the process methods of our DoFn classes. If we don't split the code inside the process methods in smaller functions, it will be impossible to know which part of the process method is causing performance issues.

Similarly, because the amount of function calls included in the profiling output is staggering, it is important to choose easily identifiable module names for our code. Dataflow workers will install our code along with any other Python dependency, in the usual site-packages directory. The only way to identify our code will be by module and file names, not by path.

In conclusion, although unlike Beam Java pipelines with Dataflow, we cannot use a tool as neat as Cloud Profiler, that does not mean that we cannot profile Python pipelines too. Yes, we can, with just a little bit of Python scripting.

--

--