Dead letter queue for errors with Beam, Asgarde, Dataflow and alerting in real time
The goal of this article is showing a use case with a Beam
pipeline containing a dead letter queue
for errors applied with Asgarde
For more details about Asgarde
library, you can check this article :
1. Explanation of the use case presented in this article
dead letter queue
is a way of catching errors in data pipelines, without interrupting the job. This technic is interesting because errors are not lost and can be sinked in the best storage service depending on the error handing strategy :
- Errors can be sent in a
PubSub topic
to be reloaded - Errors can be stored in a
table to allow developer teams or partners, analysing errors and display them in a dashboard - Errors can be written in a
Cloud storage
file to be reloaded later in abatch
In this article, a storage in a Bigquery
table was chosen.
job contains 2 sinks :
- Good outputs written in a
table - Failures written in a
table and also inCloud Logging
Failures will be exploited in the following way :
- An email alert is sent in real time when failures are written in
Cloud Logging
- A
dashboard will display the failures for a date range
Here you can see the use case diagram of this article :
2. Dead letter queue with Beam native
In this section, we will show how applying Dead Letter Queue and error handing with Beam
2.1 Example with Beam Java
To apply dead letter queue
and error handling, Beam
suggests handling side outputs with :
in aDoFn
class- Built in components
methods, example :
public Result<PCollection<TeamStats>, Failure> expand(PCollection<TeamStatsRaw> input) {
Result<PCollection<TeamStatsRaw>, Failure> res1 = input.apply("Validate fields", MapElements
.exceptionsVia(exElt -> Failure.from("Validate fields", exElt)));
PCollection<TeamStatsRaw> output1 = res1.output();
PCollection<Failure> failure1 = res1.failures();
Result<PCollection<TeamStats>, Failure> res2 = output1.apply("Compute team stats", MapElements
.exceptionsVia(exElt -> Failure.from("Compute team stats", exElt)));
PCollection<TeamStats> output2 = res2.output();
PCollection<Failure> failure2 = res2.failures();
final TransformToTeamStatsWithSloganFn toStatsWithSloganFn = new TransformToTeamStatsWithSloganFn(
"Add team slogan",
final PCollectionTuple res3 = output2.apply(name,
.withOutputTags(toStatsWithSloganFn.getOutputTag(), TupleTagList.of(toStatsWithSloganFn.getFailuresTag()))
.withSideInput("slogans", slogansSideInput));
PCollection<TeamStats> output3 = res3.get(toStatsWithSloganFn.getOutputTag());
PCollection<Failure> failure3 = res3.get(toStatsWithSloganFn.getFailuresTag());
PCollection<Failure> allFailures = PCollectionList
return Result.of(output3, allFailures);
The problem with this approach, the code is verbose and we have to repeat technical code for error handling.
Link to Github repository :
2.2 Example with Beam Python
In Python
sdk, the principle is the same, we have to use TupleTag
in a DoFn
class, example :
def expand(self, inputs: PCollection[TeamStatsRaw]) -> \
Tuple[PCollection[TeamStats], PCollection[Failure]]:
# When.
outputs_map1, failures_map1 = (
inputs | VALIDATE_FIELDS >> ParDo(TeamStatsRawFieldsValidationFn(VALIDATE_FIELDS))
.with_outputs(FAILURES, main='outputs')
outputs_map2, failures_map2 = (
outputs_map1 | COMPUTE_TEAM_STATS >> ParDo(TeamStatsMapperFn(COMPUTE_TEAM_STATS))
.with_outputs(FAILURES, main='outputs')
final_outputs, failures_map3 = (
outputs_map2 | ADD_SLOGAN_TEAM_STATS >>
.with_outputs(FAILURES, main='outputs')
result_all_failures = (
(failures_map1, failures_map2, failures_map3)
| 'All Failures PCollections' >> beam.Flatten()
return final_outputs, result_all_failures
As for the Java part, the problem is the same and we have to repeat the technical code for error handling.
Link to Github repository :
3. Dead letter queue with Asgarde library
In this section, we will show how applying Dead Letter Queue and error handing with Beam
and Asgarde
3.1 Example with Asgarde Java
proposes a CollectionComposer
class to simplify error handling with less code and more expressive code :
public Result<PCollection<TeamStats>, Failure> expand(PCollection<TeamStatsRaw> input) {
return CollectionComposer.of(input)
.apply("Validate fields", MapElements.into(of(TeamStatsRaw.class)).via(TeamStatsRaw::validateFields))
.apply("Compute team stats", MapElementFn
.withStartBundleAction(() ->"####################Start bundle compute stats")))
.apply("Add team slogan", MapProcessContextFn
.via(c -> addSloganToStats(c, slogansSideInput))
.withSetupAction(() ->"####################Start add slogan")),
Link to Github repository :
3.2 Example with Asgarde Kotlin
proposes also extensions for Kotlin
with more functional programming style :
override fun expand(input: PCollection<TeamStatsRaw>): Result<PCollection<TeamStats>, Failure> {
return CollectionComposer.of(input)
.map("Validate fields") { it.validateFields() }
name = "Compute team stats",
startBundleAction = {"####################Start bundle compute stats") },
transform = { TeamStats.computeTeamStats(it) })
.mapFnWithContext<TeamStats, TeamStats>(
name = "Add team slogan",
setupAction = {"####################Start add slogan") },
sideInputs = listOf(slogansSideInput),
transform = { addSloganToStats(it, slogansSideInput) }
Link to Github repository :
3.3 Example with Asgarde Python
as in Java
library, proposes a CollectionComposer
class to simplify error handling :
def expand(self, inputs: PCollection[TeamStatsRaw]) -> \
Tuple[PCollection[TeamStats], PCollection[Failure]]:
result = (CollectionComposer.of(inputs)
.map("Validate raw fields", lambda t_raw: t_raw.validate_fields())
.map("Compute team stats", TeamStats.compute_team_stats)
.map("Add slogan to team stats",
setup_action=lambda: '######### Start add slogan to stats actions')
return result.outputs, result.failures
Link to Github repository :
4. IO of Beam pipeline for failures
In this example, the pipeline outputs are :
- Good outputs written in a
table - Failures caught with
written in aBigquery
table for analytics and exposition in a dashboard - Failures written in
Cloud logging
with a specificlog
containing the input element and the stacktrace
The schema of the BigQuery
table is :
To be cost effective, we added a partition per DAY on the dwhCreationDate
We also added a clustering on featureName
and pipelineStep
fields to increase query performance when user will use WHERE clause on it.
The json
schema :
"name": "featureName",
"type": "STRING",
"mode": "NULLABLE",
"description": "Feature name concerned by error"
"name": "jobName",
"type": "STRING",
"mode": "NULLABLE",
"description": "Job name concerned by error"
"name": "pipelineStep",
"type": "STRING",
"mode": "NULLABLE",
"description": "Pipeline step concerned by error"
"name": "inputElement",
"type": "STRING",
"mode": "NULLABLE",
"description": "Input element when error occurred"
"name": "exceptionType",
"type": "STRING",
"mode": "NULLABLE",
"description": "Exception type of error"
"name": "stackTrace",
"type": "STRING",
"mode": "NULLABLE",
"description": "Stack trace of error"
"name": "componentType",
"type": "STRING",
"mode": "NULLABLE",
"description": "GCP Component type (Dataflow, Composer....)"
"name": "dwhCreationDate",
"type": "TIMESTAMP",
"mode": "NULLABLE",
"description": "Creation date of error"
4.1 Code of BigQuery IO in Beam Java
public PTransform<PCollection<Failure>, ? extends POutput> write() {
return BigQueryIO.<Failure>write()
.to(failureConf.getOutputDataset() + "." + failureConf.getOutputTable())
.withFormatFunction(failure -> toFailureTableRow(failureConf, failure))
private static TableRow toFailureTableRow(FailureConf failureConf, Failure failure) {
val creationDate =;
return new TableRow()
.set(FEATURE_NAME.getValue(), failureConf.getFeatureName())
.set(PIPELINE_STEP.getValue(), failure.getPipelineStep())
.set(JOB_NAME.getValue(), failureConf.getJobName())
.set(INPUT_ELEMENT.getValue(), failure.getInputElement())
.set(EXCEPTION_TYPE.getValue(), failure.getException().getClass().getSimpleName())
.set(STACK_TRACE.getValue(), ExceptionUtils.getStackTrace(failure.getException()))
.set(DWH_CREATION_DATE.getValue(), creationDate);
Link to Github repository :
4.2 Code of Cloud Logging IO in Beam Java
public PDone expand(PCollection<Failure> input) {
input.apply(STEP_MAP_FAILURE_JSON_STRING, new LogTransform());
static class LogTransform extends PTransform<PCollection<Failure>, PCollection<String>> {
public PCollection<String> expand(PCollection<Failure> input) {
return input
.apply(STEP_LOG_FAILURE, MapElements.into(strings()).via(this::logFailure));
private String logFailure(String failureAsString) {
return failureAsString;
private static String toFailureLogInfo(Failure failure) {
val inputElementInfo = "InputElement : " + failure.getInputElement();
val stackTraceInfo = "StackTrace : " + ExceptionUtils.getStackTrace(failure.getException());
return inputElementInfo + "\n" + stackTraceInfo;
The link to the Github repository :
4.3 Code of BigQuery IO in Beam Python
def expand(self, inputs_failures: PCollection[Failure]):
return (inputs_failures
| 'Map to failure table fields' >> beam.Map(self.to_failure_table_fields)
| "Sink failures to Bigquery" >>
def to_failure_table_fields(self, failure: Failure):
return {
FailureTableFields.FEATURE_NAME.value: self.pipeline_conf.failure_feature_name,
FailureTableFields.JOB_NAME.value: self.pipeline_conf.job_type,
FailureTableFields.PIPELINE_STEP.value: failure.pipeline_step,
FailureTableFields.INPUT_ELEMENT.value: failure.input_element,
FailureTableFields.EXCEPTION_TYPE.value: type(failure.exception).__name__,
FailureTableFields.STACK_TRACE.value: get_failure_error(failure),
FailureTableFields.COMPONENT_TYPE.value: 'DATAFLOW',
FailureTableFields.DWH_CREATION_DATE.value: datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
The link to the Github repository :
4.4 Code of Cloud Logging IO in Beam Python
def expand(self, inputs_failures: PCollection[Failure]):
return (inputs_failures
| 'Map to failure log info' >> beam.Map(self.to_failure_log_info)
| "Logs Failure to cloud logging" >> beam.Map(self.log_failure))
def to_failure_log_info(self, failure: Failure):
input_element_info = f'InputElement : {failure.input_element}'
stack_trace_info = f'StackTrace : {get_failure_error(failure)}'
return f'{input_element_info} \n {stack_trace_info}'
def log_failure(self, failure_log_info: str):
return failure_log_info
The link to the Github repository :
5. Create an alerting policy in Google Cloud on Beam failures
After writting failures in Cloud Logging
, we want to send an email alert in real time, here are the steps in Google Cloud
to achieve this need :
- Create a
log based metric
that filters the failurespattern
written by theDataflow
job - Create a
notification channel
, email was chosen for this example - Create an
alerting policy
based on thelog based metric
andnotification channel
created previously
5.1 Create a log-based metric
We will firstly create a log based metric
on Dataflow
jobs errors.
Some explanations about logs format on Cloud Logging
An entry in Cloud Logging
is represented by the LogEntry
format and object :
Here you can see the json
representation of LogEntry
object :
"logName": string,
"resource": {
object (MonitoredResource)
"timestamp": string,
"receiveTimestamp": string,
"severity": enum (LogSeverity),
"insertId": string,
"httpRequest": {
object (HttpRequest)
"labels": {
string: string,
"metadata": {
object (MonitoredResourceMetadata)
"operation": {
object (LogEntryOperation)
"trace": string,
"spanId": string,
"traceSampled": boolean,
"sourceLocation": {
object (LogEntrySourceLocation)
"split": {
object (LogSplit)
// Union field payload can be only one of the following:
"protoPayload": {
"@type": string,
field1: ...,
"textPayload": string,
"jsonPayload": {
// End of list of possible types for union field payload.
We can add filters on the LogEntry
Creation of the Log-based Metric
In Google Cloud Console
we access to the log-based metric
menu and fill firstly the metric type
metric name
and description
Metric type
We chosen the metric type
with counter
because everytime Dataflow
jobs write errors in logs with a specific pattern, we want to increment this metric.
The metric name
in this example is dataflow_jobs_intercepted_errors
Then we fill the filter
and label
The format used in the filter
is the one usually used in Cloud Logging
and logger explorer
in Google Cloud
allows to target only on Dataflow
jobs and services.
specifies that the filter concerns only errors in the logs.
Our Dataflow
jobs write errors in Cloud Logging
with a specific pattern with InputElement
and StackTrace
keywords, that’s why the filter adds a criteria based on jsonPayload.message
jsonPayload.message =~ InputElement.*
- The
- The
meansany character
- This criteria allows to search the pattern containing
Labels allow log-based metrics to contains multiple time series.
In this case, we add the following label resource.label.job_id
because we want applying the filter on logs per Dataflow job
and not for all jobs together.
5.2 Create a notification channel
In the menu Notification channels, we create an email channel
5.3 Create the alerting policy based on log-based metric
In this section, we will create the alerting policy
based on the log based metric
created previously.
Firstly in the New Condition menu we fill Dataflow
in the metric search field, then select log based metrics
and the metric created previously called dataflow_intercepted_errors
In the Configure trigger menu we select Treshold as condition type and the Treshold value is 0.9
The meaning is, if there is one error per Dataflow
job, an alert will be fired (1 greater than 0.9)
The last menu to configure is Alert detail, we set the notification channel created previously that is an email address.
For each alert, an email will be sent to the specified address.
It’s worth noting that we can specify multiple notification channels (several emails, Slack…).
6. Alerting email example
Here you can see an example email.
2 links are proposed in the email :
- VIEW INCIDENT proposes a link the current incident in
Google Cloud
- VIEW LOGS proposes a link to the error logs in
Cloud Logging
Example of error logs for the Dataflow
job in Cloud Logging
You can see :
- The Cloud Logging log explorer page
- The filter in the Query menu
- At the left side, the log fields : RESOURCE TYPE, SEVERITY, JOB_NAME, REGION…
- The Query result containing the error logs, it corresponds to the inputElement and the StackTrace that is the pattern specified by the Beam/Dataflow job
7. Display failures in a Datastudio dashboard
In this section, we will create a Datastudio
dashboard having the responsabilty to display failures.
This dashboard proposes the following filters :
range ondwhCreationDate
- Dropdown list on
field - Dropdown list on
field - Advanced filter on
(equals, contains..) - Advanced filter on
The rest of the page contains :
- The result failures data in a table
- Pie chart in percentage per
- Pie chart in percentage per
- Bar chart with a record count per
I will give a deep explanation for this dashboard in a separated article.
I share resources that could be helpful for some components used in this article :
Asgarde Java :
Asgarde Python :
Talk at Beam Sumit on Asgarde :
If you like the Asgarde
project support us with a Github
star ✩
If you like my articles and want to see my posts, follow me on :