Dead letter queue for errors with Beam, Asgarde, Dataflow and alerting in real time

Mazlum Tosun
Google Cloud - Community
11 min readDec 19, 2022

The goal of this article is showing a use case with a Beam pipeline containing a dead letter queue for errors applied with Asgarde library.

For more details about Asgarde library, you can check this article :

1. Explanation of the use case presented in this article

dead letter queue is a way of catching errors in data pipelines, without interrupting the job. This technic is interesting because errors are not lost and can be sinked in the best storage service depending on the error handing strategy :

  • Errors can be sent in a PubSub topic to be reloaded
  • Errors can be stored in a Bigquery table to allow developer teams or partners, analysing errors and display them in a dashboard
  • Errors can be written in a Cloud storage file to be reloaded later in a batch mode

In this article, a storage in a Bigquery table was chosen.

The Beam job contains 2 sinks :

  • Good outputs written in a Bigquery table
  • Failures written in a Bigquery table and also in Cloud Logging

Failures will be exploited in the following way :

  • An email alert is sent in real time when failures are written in Cloud Logging
  • A Datastudio dashboard will display the failures for a date range

Here you can see the use case diagram of this article :

2. Dead letter queue with Beam native

In this section, we will show how applying Dead Letter Queue and error handing with Beam native.

2.1 Example with Beam Java

To apply dead letter queue and error handling, Beam suggests handling side outputs with :

  • TupleTags in a DoFn class
  • Built in components MapElements FlatMapElements and exceptionInto and exceptionVia methods, example :
@Override
public Result<PCollection<TeamStats>, Failure> expand(PCollection<TeamStatsRaw> input) {
Result<PCollection<TeamStatsRaw>, Failure> res1 = input.apply("Validate fields", MapElements
.into(of(TeamStatsRaw.class))
.via(TeamStatsRaw::validateFields)
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Validate fields", exElt)));

PCollection<TeamStatsRaw> output1 = res1.output();
PCollection<Failure> failure1 = res1.failures();

Result<PCollection<TeamStats>, Failure> res2 = output1.apply("Compute team stats", MapElements
.into(of(TeamStats.class))
.via(TeamStats::computeTeamStats)
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Compute team stats", exElt)));

PCollection<TeamStats> output2 = res2.output();
PCollection<Failure> failure2 = res2.failures();

final TransformToTeamStatsWithSloganFn toStatsWithSloganFn = new TransformToTeamStatsWithSloganFn(
"Add team slogan",
slogansSideInput
);
final PCollectionTuple res3 = output2.apply(name,
ParDo.of(toStatsWithSloganFn)
.withOutputTags(toStatsWithSloganFn.getOutputTag(), TupleTagList.of(toStatsWithSloganFn.getFailuresTag()))
.withSideInput("slogans", slogansSideInput));

PCollection<TeamStats> output3 = res3.get(toStatsWithSloganFn.getOutputTag());
PCollection<Failure> failure3 = res3.get(toStatsWithSloganFn.getFailuresTag());

PCollection<Failure> allFailures = PCollectionList
.of(failure1)
.and(failure2)
.and(failure3)
.apply(Flatten.pCollections());

return Result.of(output3, allFailures);
}

The problem with this approach, the code is verbose and we have to repeat technical code for error handling.

Link to Github repository :

2.2 Example with Beam Python

In Python sdk, the principle is the same, we have to use TupleTag in a DoFn class, example :

def expand(self, inputs: PCollection[TeamStatsRaw]) -> \
Tuple[PCollection[TeamStats], PCollection[Failure]]:
# When.
outputs_map1, failures_map1 = (
inputs | VALIDATE_FIELDS >> ParDo(TeamStatsRawFieldsValidationFn(VALIDATE_FIELDS))
.with_outputs(FAILURES, main='outputs')
)

outputs_map2, failures_map2 = (
outputs_map1 | COMPUTE_TEAM_STATS >> ParDo(TeamStatsMapperFn(COMPUTE_TEAM_STATS))
.with_outputs(FAILURES, main='outputs')
)

final_outputs, failures_map3 = (
outputs_map2 | ADD_SLOGAN_TEAM_STATS >>
ParDo(
TeamStatsWithSloganFn(ADD_SLOGAN_TEAM_STATS),
AsSingleton(self.slogans_side_inputs)
)
.with_outputs(FAILURES, main='outputs')
)

result_all_failures = (
(failures_map1, failures_map2, failures_map3)
| 'All Failures PCollections' >> beam.Flatten()
)

return final_outputs, result_all_failures

As for the Java part, the problem is the same and we have to repeat the technical code for error handling.

Link to Github repository :

3. Dead letter queue with Asgarde library

In this section, we will show how applying Dead Letter Queue and error handing with Beam and Asgarde library.

3.1 Example with Asgarde Java

Asgarde Java proposes a CollectionComposer class to simplify error handling with less code and more expressive code :

@Override
public Result<PCollection<TeamStats>, Failure> expand(PCollection<TeamStatsRaw> input) {
return CollectionComposer.of(input)
.apply("Validate fields", MapElements.into(of(TeamStatsRaw.class)).via(TeamStatsRaw::validateFields))
.apply("Compute team stats", MapElementFn
.into(of(TeamStats.class))
.via(TeamStats::computeTeamStats)
.withStartBundleAction(() -> LOGGER.info("####################Start bundle compute stats")))
.apply("Add team slogan", MapProcessContextFn
.from(TeamStats.class)
.into(of(TeamStats.class))
.via(c -> addSloganToStats(c, slogansSideInput))
.withSetupAction(() -> LOGGER.info("####################Start add slogan")),
Collections.singletonList(slogansSideInput))
.getResult();
}

Link to Github repository :

3.2 Example with Asgarde Kotlin

Asgarde proposes also extensions for Kotlin with more functional programming style :

override fun expand(input: PCollection<TeamStatsRaw>): Result<PCollection<TeamStats>, Failure> {
return CollectionComposer.of(input)
.map("Validate fields") { it.validateFields() }
.mapFn(
name = "Compute team stats",
startBundleAction = { LOGGER.info("####################Start bundle compute stats") },
transform = { TeamStats.computeTeamStats(it) })
.mapFnWithContext<TeamStats, TeamStats>(
name = "Add team slogan",
setupAction = { LOGGER.info("####################Start add slogan") },
sideInputs = listOf(slogansSideInput),
transform = { addSloganToStats(it, slogansSideInput) }
)
.result
}

Link to Github repository :

3.3 Example with Asgarde Python

Asgarde Python as in Java library, proposes a CollectionComposer class to simplify error handling :

def expand(self, inputs: PCollection[TeamStatsRaw]) -> \
Tuple[PCollection[TeamStats], PCollection[Failure]]:
result = (CollectionComposer.of(inputs)
.map("Validate raw fields", lambda t_raw: t_raw.validate_fields())
.map("Compute team stats", TeamStats.compute_team_stats)
.map("Add slogan to team stats",
self.add_slogan_to_stats,
slogans=AsSingleton(self.slogans_side_inputs),
setup_action=lambda: '######### Start add slogan to stats actions')
)

return result.outputs, result.failures

Link to Github repository :

4. IO of Beam pipeline for failures

In this example, the pipeline outputs are :

  • Good outputs written in a Bigquery table
  • Failures caught with Beam/Asgarde written in a Bigquery table for analytics and exposition in a dashboard
  • Failures written in Cloud logging with a specific log pattern containing the input element and the stacktrace

The schema of the BigQuery job_failure table is :

To be cost effective, we added a partition per DAY on the dwhCreationDate field.

We also added a clustering on featureName jobName componentType and pipelineStep fields to increase query performance when user will use WHERE clause on it.

The json schema :

[
{
"name": "featureName",
"type": "STRING",
"mode": "NULLABLE",
"description": "Feature name concerned by error"
},
{
"name": "jobName",
"type": "STRING",
"mode": "NULLABLE",
"description": "Job name concerned by error"
},
{
"name": "pipelineStep",
"type": "STRING",
"mode": "NULLABLE",
"description": "Pipeline step concerned by error"
},
{
"name": "inputElement",
"type": "STRING",
"mode": "NULLABLE",
"description": "Input element when error occurred"
},
{
"name": "exceptionType",
"type": "STRING",
"mode": "NULLABLE",
"description": "Exception type of error"
},
{
"name": "stackTrace",
"type": "STRING",
"mode": "NULLABLE",
"description": "Stack trace of error"
},
{
"name": "componentType",
"type": "STRING",
"mode": "NULLABLE",
"description": "GCP Component type (Dataflow, Composer....)"
},
{
"name": "dwhCreationDate",
"type": "TIMESTAMP",
"mode": "NULLABLE",
"description": "Creation date of error"
}
]

4.1 Code of BigQuery IO in Beam Java

public PTransform<PCollection<Failure>, ? extends POutput> write() {
return BigQueryIO.<Failure>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.to(failureConf.getOutputDataset() + "." + failureConf.getOutputTable())
.withFormatFunction(failure -> toFailureTableRow(failureConf, failure))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
}

private static TableRow toFailureTableRow(FailureConf failureConf, Failure failure) {
val creationDate = Instant.now().toString();

return new TableRow()
.set(FEATURE_NAME.getValue(), failureConf.getFeatureName())
.set(PIPELINE_STEP.getValue(), failure.getPipelineStep())
.set(JOB_NAME.getValue(), failureConf.getJobName())
.set(INPUT_ELEMENT.getValue(), failure.getInputElement())
.set(EXCEPTION_TYPE.getValue(), failure.getException().getClass().getSimpleName())
.set(STACK_TRACE.getValue(), ExceptionUtils.getStackTrace(failure.getException()))
.set(COMPONENT_TYPE.getValue(), COMPONENT_TYPE_VALUE)
.set(DWH_CREATION_DATE.getValue(), creationDate);
}

Link to Github repository :

4.2 Code of Cloud Logging IO in Beam Java

public PDone expand(PCollection<Failure> input) {
input.apply(STEP_MAP_FAILURE_JSON_STRING, new LogTransform());
return PDone.in(input.getPipeline());
}

static class LogTransform extends PTransform<PCollection<Failure>, PCollection<String>> {
public PCollection<String> expand(PCollection<Failure> input) {
return input
.apply(STEP_MAP_FAILURE_JSON_STRING, MapElements
.into(strings())
.via(FailureCloudLoggingWriteTransform::toFailureLogInfo))
.apply(STEP_LOG_FAILURE, MapElements.into(strings()).via(this::logFailure));
}

private String logFailure(String failureAsString) {
LOGGER.error(failureAsString);
return failureAsString;
}
}

private static String toFailureLogInfo(Failure failure) {
val inputElementInfo = "InputElement : " + failure.getInputElement();
val stackTraceInfo = "StackTrace : " + ExceptionUtils.getStackTrace(failure.getException());

return inputElementInfo + "\n" + stackTraceInfo;
}

The link to the Github repository :

4.3 Code of BigQuery IO in Beam Python

def expand(self, inputs_failures: PCollection[Failure]):
return (inputs_failures
| 'Map to failure table fields' >> beam.Map(self.to_failure_table_fields)
| "Sink failures to Bigquery" >> beam.io.WriteToBigQuery(
project=self.pipeline_conf.project_id,
dataset=self.pipeline_conf.failure_output_dataset,
table=self.pipeline_conf.failure_output_table,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)

def to_failure_table_fields(self, failure: Failure):
return {
FailureTableFields.FEATURE_NAME.value: self.pipeline_conf.failure_feature_name,
FailureTableFields.JOB_NAME.value: self.pipeline_conf.job_type,
FailureTableFields.PIPELINE_STEP.value: failure.pipeline_step,
FailureTableFields.INPUT_ELEMENT.value: failure.input_element,
FailureTableFields.EXCEPTION_TYPE.value: type(failure.exception).__name__,
FailureTableFields.STACK_TRACE.value: get_failure_error(failure),
FailureTableFields.COMPONENT_TYPE.value: 'DATAFLOW',
FailureTableFields.DWH_CREATION_DATE.value: datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
}

The link to the Github repository :

4.4 Code of Cloud Logging IO in Beam Python

def expand(self, inputs_failures: PCollection[Failure]):
return (inputs_failures
| 'Map to failure log info' >> beam.Map(self.to_failure_log_info)
| "Logs Failure to cloud logging" >> beam.Map(self.log_failure))

def to_failure_log_info(self, failure: Failure):
input_element_info = f'InputElement : {failure.input_element}'
stack_trace_info = f'StackTrace : {get_failure_error(failure)}'

return f'{input_element_info} \n {stack_trace_info}'

def log_failure(self, failure_log_info: str):
logging.error(failure_log_info)

return failure_log_info

The link to the Github repository :

5. Create an alerting policy in Google Cloud on Beam failures

After writting failures in Cloud Logging , we want to send an email alert in real time, here are the steps in Google Cloud to achieve this need :

  • Create a log based metric that filters the failures pattern written by the Dataflow job
  • Create a notification channel, email was chosen for this example
  • Create an alerting policy based on the log based metric and notification channel created previously

5.1 Create a log-based metric

We will firstly create a log based metric on Dataflow jobs errors.

Some explanations about logs format on Cloud Logging

An entry in Cloud Logging is represented by the LogEntry format and object :

Here you can see the json representation of LogEntry object :

{
"logName": string,
"resource": {
object (MonitoredResource)
},
"timestamp": string,
"receiveTimestamp": string,
"severity": enum (LogSeverity),
"insertId": string,
"httpRequest": {
object (HttpRequest)
},
"labels": {
string: string,
...
},
"metadata": {
object (MonitoredResourceMetadata)
},
"operation": {
object (LogEntryOperation)
},
"trace": string,
"spanId": string,
"traceSampled": boolean,
"sourceLocation": {
object (LogEntrySourceLocation)
},
"split": {
object (LogSplit)
},
// Union field payload can be only one of the following:
"protoPayload": {
"@type": string,
field1: ...,
...
},
"textPayload": string,
"jsonPayload": {
object
}
// End of list of possible types for union field payload.
}

We can add filters on the LogEntry fields.

Creation of the Log-based Metric

In Google Cloud Console we access to the log-based metric menu and fill firstly the metric type metric name and description :

Metric type

We chosen the metric type with counter because everytime Dataflow jobs write errors in logs with a specific pattern, we want to increment this metric.

The metric name in this example is dataflow_jobs_intercepted_errors

Filter

Then we fill the filter and label :

The format used in the filter is the one usually used in Cloud Logging and logger explorer in Google Cloud

resource.type=dataflow_step allows to target only on Dataflow jobs and services.

severity=ERROR specifies that the filter concerns only errors in the logs.

Our Dataflow jobs write errors in Cloud Logging with a specific pattern with InputElement and StackTrace keywords, that’s why the filter adds a criteria based on jsonPayload.message

jsonPayload.message =~ InputElement.*

  • The =~ means contains
  • The .* means any character
  • This criteria allows to search the pattern containing InputElement

Labels

Labels allow log-based metrics to contains multiple time series.

In this case, we add the following label resource.label.job_id because we want applying the filter on logs per Dataflow job and not for all jobs together.

5.2 Create a notification channel

In the menu Notification channels, we create an email channel

5.3 Create the alerting policy based on log-based metric

In this section, we will create the alerting policy based on the log based metric created previously.

Firstly in the New Condition menu we fill Dataflow in the metric search field, then select log based metrics and the metric created previously called dataflow_intercepted_errors

In the Configure trigger menu we select Treshold as condition type and the Treshold value is 0.9

The meaning is, if there is one error per Dataflow job, an alert will be fired (1 greater than 0.9)

The last menu to configure is Alert detail, we set the notification channel created previously that is an email address.

For each alert, an email will be sent to the specified address.

It’s worth noting that we can specify multiple notification channels (several emails, Slack…).

6. Alerting email example

Here you can see an example email.

2 links are proposed in the email :

  • VIEW INCIDENT proposes a link the current incident in Google Cloud
  • VIEW LOGS proposes a link to the error logs in Cloud Logging

Example of error logs for the Dataflow job in Cloud Logging :

You can see :

  • The Cloud Logging log explorer page
  • The filter in the Query menu
  • At the left side, the log fields : RESOURCE TYPE, SEVERITY, JOB_NAME, REGION
  • The Query result containing the error logs, it corresponds to the inputElement and the StackTrace that is the pattern specified by the Beam/Dataflow job

7. Display failures in a Datastudio dashboard

In this section, we will create a Datastudio dashboard having the responsabilty to display failures.

This dashboard proposes the following filters :

  • date range on dwhCreationDate
  • Dropdown list on featureName field
  • Dropdown list on jobName field
  • Advanced filter on InputElement (equals, contains..)
  • Advanced filter on StackTrace

The rest of the page contains :

  • The result failures data in a table
  • Pie chart in percentage per exceptionType
  • Pie chart in percentage per jobName
  • Bar chart with a record count per dwhCreationDate and jobName

I will give a deep explanation for this dashboard in a separated article.

I share resources that could be helpful for some components used in this article :

Asgarde Java :

Asgarde Python :

Talk at Beam Sumit on Asgarde :

If you like the Asgarde project support us with a Github star ✩

If you like my articles and want to see my posts, follow me on :

- Medium
- Twitter
- LinkedIn

--

--

Mazlum Tosun
Google Cloud - Community

GDE Cloud | Docker Captain | CDO GroupBees | Data | Serverless | IAC | Devops | FP