Dead letter queue for errors with Beam, Asgarde, Dataflow and alerting in real time
The goal of this article is showing a use case with a Beam pipeline containing a dead letter queue for errors applied with Asgarde library.
For more details about Asgarde library, you can check this article :
1. Explanation of the use case presented in this article
dead letter queue is a way of catching errors in data pipelines, without interrupting the job. This technic is interesting because errors are not lost and can be sinked in the best storage service depending on the error handing strategy :
- Errors can be sent in a
PubSub topicto be reloaded - Errors can be stored in a
Bigquerytable to allow developer teams or partners, analysing errors and display them in a dashboard - Errors can be written in a
Cloud storagefile to be reloaded later in abatchmode
In this article, a storage in a Bigquery table was chosen.
The
Beamjob contains 2 sinks :
- Good outputs written in a
Bigquerytable - Failures written in a
Bigquerytable and also inCloud Logging
Failures will be exploited in the following way :
- An email alert is sent in real time when failures are written in
Cloud Logging - A
Datastudiodashboard will display the failures for a date range
Here you can see the use case diagram of this article :
2. Dead letter queue with Beam native
In this section, we will show how applying Dead Letter Queue and error handing with Beam native.
2.1 Example with Beam Java
To apply dead letter queue and error handling, Beam suggests handling side outputs with :
TupleTagsin aDoFnclass- Built in components
MapElementsFlatMapElementsandexceptionIntoandexceptionViamethods, example :
@Override
public Result<PCollection<TeamStats>, Failure> expand(PCollection<TeamStatsRaw> input) {
Result<PCollection<TeamStatsRaw>, Failure> res1 = input.apply("Validate fields", MapElements
.into(of(TeamStatsRaw.class))
.via(TeamStatsRaw::validateFields)
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Validate fields", exElt)));
PCollection<TeamStatsRaw> output1 = res1.output();
PCollection<Failure> failure1 = res1.failures();
Result<PCollection<TeamStats>, Failure> res2 = output1.apply("Compute team stats", MapElements
.into(of(TeamStats.class))
.via(TeamStats::computeTeamStats)
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Compute team stats", exElt)));
PCollection<TeamStats> output2 = res2.output();
PCollection<Failure> failure2 = res2.failures();
final TransformToTeamStatsWithSloganFn toStatsWithSloganFn = new TransformToTeamStatsWithSloganFn(
"Add team slogan",
slogansSideInput
);
final PCollectionTuple res3 = output2.apply(name,
ParDo.of(toStatsWithSloganFn)
.withOutputTags(toStatsWithSloganFn.getOutputTag(), TupleTagList.of(toStatsWithSloganFn.getFailuresTag()))
.withSideInput("slogans", slogansSideInput));
PCollection<TeamStats> output3 = res3.get(toStatsWithSloganFn.getOutputTag());
PCollection<Failure> failure3 = res3.get(toStatsWithSloganFn.getFailuresTag());
PCollection<Failure> allFailures = PCollectionList
.of(failure1)
.and(failure2)
.and(failure3)
.apply(Flatten.pCollections());
return Result.of(output3, allFailures);
}The problem with this approach, the code is verbose and we have to repeat technical code for error handling.
Link to Github repository :
2.2 Example with Beam Python
In Python sdk, the principle is the same, we have to use TupleTag in a DoFn class, example :
def expand(self, inputs: PCollection[TeamStatsRaw]) -> \
Tuple[PCollection[TeamStats], PCollection[Failure]]:
# When.
outputs_map1, failures_map1 = (
inputs | VALIDATE_FIELDS >> ParDo(TeamStatsRawFieldsValidationFn(VALIDATE_FIELDS))
.with_outputs(FAILURES, main='outputs')
)
outputs_map2, failures_map2 = (
outputs_map1 | COMPUTE_TEAM_STATS >> ParDo(TeamStatsMapperFn(COMPUTE_TEAM_STATS))
.with_outputs(FAILURES, main='outputs')
)
final_outputs, failures_map3 = (
outputs_map2 | ADD_SLOGAN_TEAM_STATS >>
ParDo(
TeamStatsWithSloganFn(ADD_SLOGAN_TEAM_STATS),
AsSingleton(self.slogans_side_inputs)
)
.with_outputs(FAILURES, main='outputs')
)
result_all_failures = (
(failures_map1, failures_map2, failures_map3)
| 'All Failures PCollections' >> beam.Flatten()
)
return final_outputs, result_all_failuresAs for the Java part, the problem is the same and we have to repeat the technical code for error handling.
Link to Github repository :
3. Dead letter queue with Asgarde library
In this section, we will show how applying Dead Letter Queue and error handing with Beam and Asgarde library.
3.1 Example with Asgarde Java
Asgarde Java proposes a CollectionComposer class to simplify error handling with less code and more expressive code :
@Override
public Result<PCollection<TeamStats>, Failure> expand(PCollection<TeamStatsRaw> input) {
return CollectionComposer.of(input)
.apply("Validate fields", MapElements.into(of(TeamStatsRaw.class)).via(TeamStatsRaw::validateFields))
.apply("Compute team stats", MapElementFn
.into(of(TeamStats.class))
.via(TeamStats::computeTeamStats)
.withStartBundleAction(() -> LOGGER.info("####################Start bundle compute stats")))
.apply("Add team slogan", MapProcessContextFn
.from(TeamStats.class)
.into(of(TeamStats.class))
.via(c -> addSloganToStats(c, slogansSideInput))
.withSetupAction(() -> LOGGER.info("####################Start add slogan")),
Collections.singletonList(slogansSideInput))
.getResult();
}Link to Github repository :
3.2 Example with Asgarde Kotlin
Asgarde proposes also extensions for Kotlin with more functional programming style :
override fun expand(input: PCollection<TeamStatsRaw>): Result<PCollection<TeamStats>, Failure> {
return CollectionComposer.of(input)
.map("Validate fields") { it.validateFields() }
.mapFn(
name = "Compute team stats",
startBundleAction = { LOGGER.info("####################Start bundle compute stats") },
transform = { TeamStats.computeTeamStats(it) })
.mapFnWithContext<TeamStats, TeamStats>(
name = "Add team slogan",
setupAction = { LOGGER.info("####################Start add slogan") },
sideInputs = listOf(slogansSideInput),
transform = { addSloganToStats(it, slogansSideInput) }
)
.result
}Link to Github repository :
3.3 Example with Asgarde Python
Asgarde Python as in Java library, proposes a CollectionComposer class to simplify error handling :
def expand(self, inputs: PCollection[TeamStatsRaw]) -> \
Tuple[PCollection[TeamStats], PCollection[Failure]]:
result = (CollectionComposer.of(inputs)
.map("Validate raw fields", lambda t_raw: t_raw.validate_fields())
.map("Compute team stats", TeamStats.compute_team_stats)
.map("Add slogan to team stats",
self.add_slogan_to_stats,
slogans=AsSingleton(self.slogans_side_inputs),
setup_action=lambda: '######### Start add slogan to stats actions')
)
return result.outputs, result.failuresLink to Github repository :
4. IO of Beam pipeline for failures
In this example, the pipeline outputs are :
- Good outputs written in a
Bigquerytable - Failures caught with
Beam/Asgardewritten in aBigquerytable for analytics and exposition in a dashboard - Failures written in
Cloud loggingwith a specificlogpatterncontaining the input element and the stacktrace
The schema of the BigQuery job_failure table is :
To be cost effective, we added a partition per DAY on the dwhCreationDate field.
We also added a clustering on featureName jobName componentType and pipelineStep fields to increase query performance when user will use WHERE clause on it.
The json schema :
[
{
"name": "featureName",
"type": "STRING",
"mode": "NULLABLE",
"description": "Feature name concerned by error"
},
{
"name": "jobName",
"type": "STRING",
"mode": "NULLABLE",
"description": "Job name concerned by error"
},
{
"name": "pipelineStep",
"type": "STRING",
"mode": "NULLABLE",
"description": "Pipeline step concerned by error"
},
{
"name": "inputElement",
"type": "STRING",
"mode": "NULLABLE",
"description": "Input element when error occurred"
},
{
"name": "exceptionType",
"type": "STRING",
"mode": "NULLABLE",
"description": "Exception type of error"
},
{
"name": "stackTrace",
"type": "STRING",
"mode": "NULLABLE",
"description": "Stack trace of error"
},
{
"name": "componentType",
"type": "STRING",
"mode": "NULLABLE",
"description": "GCP Component type (Dataflow, Composer....)"
},
{
"name": "dwhCreationDate",
"type": "TIMESTAMP",
"mode": "NULLABLE",
"description": "Creation date of error"
}
]4.1 Code of BigQuery IO in Beam Java
public PTransform<PCollection<Failure>, ? extends POutput> write() {
return BigQueryIO.<Failure>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.to(failureConf.getOutputDataset() + "." + failureConf.getOutputTable())
.withFormatFunction(failure -> toFailureTableRow(failureConf, failure))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
}
private static TableRow toFailureTableRow(FailureConf failureConf, Failure failure) {
val creationDate = Instant.now().toString();
return new TableRow()
.set(FEATURE_NAME.getValue(), failureConf.getFeatureName())
.set(PIPELINE_STEP.getValue(), failure.getPipelineStep())
.set(JOB_NAME.getValue(), failureConf.getJobName())
.set(INPUT_ELEMENT.getValue(), failure.getInputElement())
.set(EXCEPTION_TYPE.getValue(), failure.getException().getClass().getSimpleName())
.set(STACK_TRACE.getValue(), ExceptionUtils.getStackTrace(failure.getException()))
.set(COMPONENT_TYPE.getValue(), COMPONENT_TYPE_VALUE)
.set(DWH_CREATION_DATE.getValue(), creationDate);
}Link to Github repository :
4.2 Code of Cloud Logging IO in Beam Java
public PDone expand(PCollection<Failure> input) {
input.apply(STEP_MAP_FAILURE_JSON_STRING, new LogTransform());
return PDone.in(input.getPipeline());
}
static class LogTransform extends PTransform<PCollection<Failure>, PCollection<String>> {
public PCollection<String> expand(PCollection<Failure> input) {
return input
.apply(STEP_MAP_FAILURE_JSON_STRING, MapElements
.into(strings())
.via(FailureCloudLoggingWriteTransform::toFailureLogInfo))
.apply(STEP_LOG_FAILURE, MapElements.into(strings()).via(this::logFailure));
}
private String logFailure(String failureAsString) {
LOGGER.error(failureAsString);
return failureAsString;
}
}
private static String toFailureLogInfo(Failure failure) {
val inputElementInfo = "InputElement : " + failure.getInputElement();
val stackTraceInfo = "StackTrace : " + ExceptionUtils.getStackTrace(failure.getException());
return inputElementInfo + "\n" + stackTraceInfo;
}The link to the Github repository :
4.3 Code of BigQuery IO in Beam Python
def expand(self, inputs_failures: PCollection[Failure]):
return (inputs_failures
| 'Map to failure table fields' >> beam.Map(self.to_failure_table_fields)
| "Sink failures to Bigquery" >> beam.io.WriteToBigQuery(
project=self.pipeline_conf.project_id,
dataset=self.pipeline_conf.failure_output_dataset,
table=self.pipeline_conf.failure_output_table,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
def to_failure_table_fields(self, failure: Failure):
return {
FailureTableFields.FEATURE_NAME.value: self.pipeline_conf.failure_feature_name,
FailureTableFields.JOB_NAME.value: self.pipeline_conf.job_type,
FailureTableFields.PIPELINE_STEP.value: failure.pipeline_step,
FailureTableFields.INPUT_ELEMENT.value: failure.input_element,
FailureTableFields.EXCEPTION_TYPE.value: type(failure.exception).__name__,
FailureTableFields.STACK_TRACE.value: get_failure_error(failure),
FailureTableFields.COMPONENT_TYPE.value: 'DATAFLOW',
FailureTableFields.DWH_CREATION_DATE.value: datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
}The link to the Github repository :
4.4 Code of Cloud Logging IO in Beam Python
def expand(self, inputs_failures: PCollection[Failure]):
return (inputs_failures
| 'Map to failure log info' >> beam.Map(self.to_failure_log_info)
| "Logs Failure to cloud logging" >> beam.Map(self.log_failure))
def to_failure_log_info(self, failure: Failure):
input_element_info = f'InputElement : {failure.input_element}'
stack_trace_info = f'StackTrace : {get_failure_error(failure)}'
return f'{input_element_info} \n {stack_trace_info}'
def log_failure(self, failure_log_info: str):
logging.error(failure_log_info)
return failure_log_infoThe link to the Github repository :
5. Create an alerting policy in Google Cloud on Beam failures
After writting failures in Cloud Logging , we want to send an email alert in real time, here are the steps in Google Cloud to achieve this need :
- Create a
log based metricthat filters the failurespatternwritten by theDataflowjob - Create a
notification channel, email was chosen for this example - Create an
alerting policybased on thelog based metricandnotification channelcreated previously
5.1 Create a log-based metric
We will firstly create a log based metric on Dataflow jobs errors.
Some explanations about logs format on Cloud Logging
An entry in Cloud Logging is represented by the LogEntry format and object :
Here you can see the json representation of LogEntry object :
{
"logName": string,
"resource": {
object (MonitoredResource)
},
"timestamp": string,
"receiveTimestamp": string,
"severity": enum (LogSeverity),
"insertId": string,
"httpRequest": {
object (HttpRequest)
},
"labels": {
string: string,
...
},
"metadata": {
object (MonitoredResourceMetadata)
},
"operation": {
object (LogEntryOperation)
},
"trace": string,
"spanId": string,
"traceSampled": boolean,
"sourceLocation": {
object (LogEntrySourceLocation)
},
"split": {
object (LogSplit)
},
// Union field payload can be only one of the following:
"protoPayload": {
"@type": string,
field1: ...,
...
},
"textPayload": string,
"jsonPayload": {
object
}
// End of list of possible types for union field payload.
}We can add filters on the LogEntry fields.
Creation of the Log-based Metric
In Google Cloud Console we access to the log-based metric menu and fill firstly the metric type metric name and description :
Metric type
We chosen the metric type with counter because everytime Dataflow jobs write errors in logs with a specific pattern, we want to increment this metric.
The metric name in this example is dataflow_jobs_intercepted_errors
Filter
Then we fill the filter and label :
The format used in the filter is the one usually used in Cloud Logging and logger explorer in Google Cloud
resource.type=dataflow_step allows to target only on Dataflow jobs and services.
severity=ERROR specifies that the filter concerns only errors in the logs.
Our Dataflow jobs write errors in Cloud Logging with a specific pattern with InputElement and StackTrace keywords, that’s why the filter adds a criteria based on jsonPayload.message
jsonPayload.message =~ InputElement.*
- The
=~meanscontains - The
.*meansany character - This criteria allows to search the pattern containing
InputElement
Labels
Labels allow log-based metrics to contains multiple time series.
In this case, we add the following label resource.label.job_id because we want applying the filter on logs per Dataflow job and not for all jobs together.
5.2 Create a notification channel
In the menu Notification channels, we create an email channel
5.3 Create the alerting policy based on log-based metric
In this section, we will create the alerting policy based on the log based metric created previously.
Firstly in the New Condition menu we fill Dataflow in the metric search field, then select log based metrics and the metric created previously called dataflow_intercepted_errors
In the Configure trigger menu we select Treshold as condition type and the Treshold value is 0.9
The meaning is, if there is one error per Dataflow job, an alert will be fired (1 greater than 0.9)
The last menu to configure is Alert detail, we set the notification channel created previously that is an email address.
For each alert, an email will be sent to the specified address.
It’s worth noting that we can specify multiple notification channels (several emails, Slack…).
6. Alerting email example
Here you can see an example email.
2 links are proposed in the email :
- VIEW INCIDENT proposes a link the current incident in
Google Cloud - VIEW LOGS proposes a link to the error logs in
Cloud Logging
Example of error logs for the Dataflow job in Cloud Logging :
You can see :
- The Cloud Logging log explorer page
- The filter in the Query menu
- At the left side, the log fields : RESOURCE TYPE, SEVERITY, JOB_NAME, REGION…
- The Query result containing the error logs, it corresponds to the inputElement and the StackTrace that is the pattern specified by the Beam/Dataflow job
7. Display failures in a Datastudio dashboard
In this section, we will create a Datastudio dashboard having the responsabilty to display failures.
This dashboard proposes the following filters :
daterange ondwhCreationDate- Dropdown list on
featureNamefield - Dropdown list on
jobNamefield - Advanced filter on
InputElement(equals, contains..) - Advanced filter on
StackTrace
The rest of the page contains :
- The result failures data in a table
- Pie chart in percentage per
exceptionType - Pie chart in percentage per
jobName - Bar chart with a record count per
dwhCreationDateandjobName
I will give a deep explanation for this dashboard in a separated article.
I share resources that could be helpful for some components used in this article :
Asgarde Java :
Asgarde Python :
Talk at Beam Sumit on Asgarde :
If you like the Asgarde project support us with a Github star ✩
If you like my articles and want to see my posts, follow me on :

