Building a Data Warehouse using Apache Beam and Dataflow — Part IIII Exceptions & Error Handling

Ahmed El.Hussaini
2 min readJan 19, 2019

--

Photo by Estée Janssens on Unsplash

In Part I of this series, we saw how to build a simple Apache Beam pipeline. In Part II we laid the groundwork. In Part III we built a batch pipeline to extract albums from a Postgres database then transform them and finally load them into BigQuery. In this post, we will examine how to properly handle errors when developing pipelines.

Just like any program or application you write, exception handling is a crucial part of the development process. However, handling exceptions when developing beam pipelines is a bit different from a regular Java application.

The reason why it’s different is simply that not only do we not want the entire pipeline to come crashing down on us because of an unhandled exception, but more importantly because we want to ensure that no data loss occurs when exceptions do happen.

Luckily Beam provides the tools to capture and handle exceptions in an elegant way. In the same manner of passing output from one transform function to another, exceptions and errors can be passed down the pipeline.

Out of the box we get from the Beam SDK the support to output multiple tags from the same transform function using TupleTag.

Using TupleTag we can define two outputs for our transform functions, one output will represent the success route in which data is successfully transformed and pushed down the pipeline without errors, the other route as you might have guessed is for when exceptions occur.

To demonstrate this, let’s refactor the pipeline we wrote to handle exceptions using TupleTags.

Let’s break the changes we implemented or added to this pipeline. First, we created the following TupleTaginstances to store success and failed outputs.

Then we call the apply method on our pipeline, but this time we include the tuple tags we created above:

And from that PCollectionTuple we can extract the success or failed outputs using the getmethod and passing the desired output tag tuple.

For instance, to store all failed exceptions messages to a text file under cloud storage, you simply write the following:

And that is how you handle exceptions when developing pipelines using apache beam.

--

--