An automated way to handle failures in a streaming data pipeline

How to replay failed elements from an ingestion data pipeline based on a rule engine?

Rémy Larroye
8 min readSep 16, 2022

Streaming data pipelines are the foundation of big data. As a data engineer, monitoring and repairing them can be time consuming. Having a data replay and error alerting system is a real-time saver and increases the quality of your data.

Image by Gala Amarando from Pixabay

Our prerequisites

Our goal is to put this mechanism on all our data pipelines. This leads directly to a number of constraints and therefore technical solutions:

  • Reusable to all our pipelines without touching the code → we use a rules engine
  • The system must have a low cost if there isn’t any error nor data to replay → we use pay as you go services or stop them when they’re not necessary
  • It must be highly scalable to work with our larger use cases → we only use GCP services that can scale to petabyte, if possible serverless services
  • It must be fast and cheap to deploy → we use automatic deployment and IaC with Terraform

The working principle

In our example the goal is to collect data from sensors installed on servers. The aim is to be able to monitor them and do predictive maintenance.
The data arriving in the ingestion pipeline looks like this:

{
"datetime":"2022–01–01",
"machineID":1,
"pressure":113.07793,
"vibration":45.08766,
}

The data is then enriched with the fields model and age before being stored in our data warehouse. The enriched data looks like this:

{
"datetime":"2022–01–01",
"machineID":1,
"pressure":113.07793,
"vibration":45.08766,
"model":"model3",
"age":18
}

The animation below shows the working principle of our data alert and replay system.

High level architecture in work

It starts with the collection of elements containing errors

The first part is to collect and store in BigQuery the elements that have an error in the ingestion pipeline (in blue on the architecture above) to be able to analyze and replay them. Errors can occur either in PubSub or in Dataflow.

Errors in PubSub

The error that can happen in PubSub is the non acknowledgment of a message. If this is the case, the message will be published in a deadlettering topic. This message is then stored in BigQuery using a dataflow job which is started if there are elements in the topic and stopped when there are no more to minimize the cost.

BigQuery preview of some elements that failed in the PubSub

The schema of the table that stores elements with errors in PubSub is :

  • currentPayload: The input element on which the error happened.
  • attributes: The json of attributes of the message.
  • numberReplay: The number of replay that the message have already done.
  • timestamp: Timestamp at which the error happened.

Errors in Dataflow

The errors occurring in the dataflow are mainly due to two causes :

  • The non-respect of the expected data schema. For example, a required field is missing or some fields do not have the correct type/format.
  • An error during the transformation and enrichment of the data. The cause may be a bug in the pipeline code. Or due to a malfunction of one of the services (API, database, cloud function, …) on which the pipeline depends. In our example the pipeline enriches the input data with the model and age fields by making a query on a Cloud SQL table.

This throws an error at code-level which causes the dataflow pipeline to fail. To avoid this, the input elements of the pipeline are transformed before any processing into FailsafeElement. Thanks to this mechanism, elements failing a transformation and raising an error are caught up. This prevents the dataflow pipeline from failing but also recovers the information needed to analyze the error: the error message, the operation where it happened, the original element.

BigQuery preview of some elements that failed in the Dataflow

The schema of the table that store elements with errors in Dataflow is :

  • currentPayload: Input element on which the error happened.
  • operationName: Name of the operation which produced the error.
  • pipelineName: Name of the pipeline which produced the error.
  • errorMessage: Message of the error which happened.
  • errorCause: Cause of the error which happened.
  • numberReplay: Number of replay that the message have already done.
  • stacktrace: Stacktrace of the error which happened.

Some elements have errors, how to alert maintainers?

For this you only need to write rules. By default we use two rules:

Examples of rules that we use by default

Our default rules are designed to handle all the errors that occur in the ingestion pipeline and they are ran very regularly to quikly alert in case of errors.
The first one handles elements that have an error in PubSub, rather than alerting a maintainer directly we choose to replay the item once before sending a notification as the error could be temporary.
The second rule deals with errors occurring in Dataflow, in this case if there is an error it will be due to the code and it’s not temporary, we alert the maintainer directly to analyze what is happening.

Our rules have the following structure:

  • name: Name of the rule.
  • description: Description, for informative purpose.
  • cron: CRON expression to determine the frequency of execution of a given rule.
  • source: Select if the elements came from an error in the Pub/Sub or in the dataflow.
  • replay_query: Query that selects data to replay.
  • retry_before_alert: Number of replay the system must do before sending an alert.
  • email: Email to which the alert will be sent.

Take the example of the diagram above “High level architecture in work”. Following an update, messages arrive in the ingestion pipeline with an unexpected field ‘temperature’. The mechanism notifies by email that there had been errors in the pipeline and the error message (attribute “errorMessage”) is “Unexpected field name found: temperature”.

Fix the pipeline, then replay failed elements

We deploy a new version of the dataflow pipeline with the “temperature” field taken into account. Then we add a new rule to replay only the elements that failed because of the temperature field with the following rule:

{
“name”: “Fix missing field model”,
“cron”: “0 4 25 8 *”, #To be executed only once on a specific date
“source”: “Dataflow”,
“replay_query”: “SELECT * FROM `{table_id}` WHERE timestamp < TIMESTAMP(‘{limit_timestamp}’) AND errorMessage = ”Unexpected field name found: temperature”,
“retry_before_alert”: 0,
“email”: “maintainer@email.com
},

And so the data will be replayed in the ingestion pipeline and quickly available in the data warehouse for the teams that need it.

Now, the architecture

In the high level architecture I have hidden a number of elements to make the principle easier to understand. Like the mechanisms for deploying a new version of the dataflow job and the collection of messages from the Pub/Sub deadlettering topics to Bigquery.

Here is the actual architecture implemented :

The complete architecture of the solution

How does it work?

  • BigQuery “Rules of replay”: Store replay and error rules.
  • Cloud Scheduler: Starts the system. It calls the cloud function on a regular basis, defined by a CRON.
  • Cloud Function: Reads the rules in the BigQuery table “Rules of replay”. From these rules it will query the Failed elements in BigQuery to detect if there are elements that violate a rule and alert the maintainer. If there are any elements to replay, it triggers the run of DAG in Composer.
  • Cloud Composer: Launches the data replay DAG. To do this, it runs a dataflow job to replay some elements in the ingestion pipeline. Then it deletes the already replayed data from BigQuery.

Why is it so complex to collect messages from dealettering’s PubSub topics and write them in the BigQuery table pubsub_dealetter_messages?

Current architecture

Let us consider simpler architectures:

The first one uses only one dataflow job reading from PubSub and writing directly to BigQuery. The second one is the new functionality to stream data from Pub/Sub directly to BigQuery.

In both cases it will do streaming insert in BigQuery.

Architecture of streaming insert in BigQuery

The recently inserted row will be retained in the streaming buffer of BigQuery. These rows can be read immediately after insertion but to update or delete you have to wait that they are convert to Columnar storage. This takes at maximum the partitioning time. This prevents updating or deleting recent rows from the BigQuery table.

This considerations are important, because after an element is replayed from BigQuery, this element must be deleted. Even if fixing your pipeline to handle elements which failed because of an applicative error in Dataflow might take time, you might replay deadletter elements from PubSub before the partition time of the error table (let’s say you replay it every ten minutes)

In this case, because BigQuery has a one hour partition time, you risks to replay the same elements several times (if the first replays end up by failing a second time in PubSub). So, I choose to inject elements in GCS, thus having a bounded source for my Dataflow, and injecting my elements in batch in BigQuery

Is the complexity of the architecture a problem?

Absolutely not! Because what is important for us is to be able to deploy our system easily and quickly on a large number of pipelines. As well as the ease of use for users to create new rules, see where there have been errors and understand them so that they can correct them quickly and ensure the best quality of service of their data pipelines.

If you are interested in how to build robust data pipelines with the capability to catch and replay failed elements with no service disruption, or you want more details on a certain topic mentioned here, let me know in the comment ! If you are an enterprise focusing on your Cloud transformation, feel free to contact Orange Business Service, and I will be happy help you !

If you found this article helpful, please give it a clap, share it. For more content like this, don’t forget to subscribe. I publish new articles every 3 weeks on Medium.

Credits :

All icon come from Flaticon : https://www.flaticon.com/free-icons

  • New icons created by Pixel perfect — Flaticon
  • Search icons created by Dimitry Miroliubov — Flaticon
  • Mail icons created by Freepik — Flaticon
  • Email icons created by Freepik — Flaticon
  • Paper icons created by Freepik — Flaticon

--

--

Rémy Larroye

Data engineer, Devops and MLOPS enthousiast at Orange Business