Error Handling for ApacheBeam & BigQuery (Java SDK)

This story will explain deeply with code examples, how to maintain errors in your pipeline using Beam features.

Photo by tian kuan on Unsplash

Design the pipeline:

Let’s assume we have the simplest scenario, events are streaming to Kafka, and we want to consume the events in our pipeline, make some transformations and write results to BigQuery tables, to make the data available for analytics. 
BigQuery table can be created before the job has started, or, the Beam itself can create it.

The code will look pretty simple:

PCollection<TableRowWithEvent>

What is missing?

In the real world, errors may occur, and in most situations, we will be required to handle them.

In the above pipeline, errors can be resumed when we try to parse the event from Kafka into JsonNode, it can also be during transformation, and also in BigQuery insert phase.

Error Handling Plan

For each error, we will create a row in a different BigQuery table, contained more information, like the origin event from Kafka.

Once error occurred we can analyze the error record and get a full picture of it.

Then, we can either fix the pipeline code, reset/change Kafka consumer group offset, and then replay events again, now with the fixed code.

We can also, fix the event itself (for example, in JSON parsing errors) and resend it to Kafka.

Handling transforming errors:

Let’s have a quick look into our transformation function:

Yes, we may fail on parsing, we parse the string to Float/Long and this can be failed on data that can’t be converted.

We need to exclude failed data from the main function output and send them to a different path in the pipeline, then we will save them to an error table in BigQuery.

How? let’s use Tags.

When we output element at the end of the ParDo function, we can output it within a tag. Then we can get all the element tagged by specific name, and make some process on them.

Here we will use 2 tags, one is the MAIN tag, which holds all the succeeded records. and another one contains all the errors with some context: DEADLETTER_OUT.

See, that main tag must be in the same type as the OUTPUT type of the ParDo function itself, all other tags can be in any different type.

Now, our ParDo function will look like this, see the tag addition:

And how we can process elements by tag? let’s change the pipeline, and make the split, MAIN elements are going to BigQuery table, and DEADLETTER_OUT elements to error table.

Handle BigQuery insert errors

In order to handle errors during BigQuery insertion, we will have to use some BiqQueryIO API.

Let’s zoom in the write phase. and change it a bit:

In the above snippet, we get from BigQueryIO the failed TableRows with their error, now we can transform them to another TableRow and write it to an error table. In this case, we let the job to create the table when needed.