Dead letter queue for errors with Beam, Asgarde, Dataflow and alerting in real time
The goal of this article is showing a use case with a Beam
pipeline containing a dead letter queue
for errors applied with Asgarde
library.
For more details about Asgarde
library, you can check this article :
1. Explanation of the use case presented in this article
dead letter queue
is a way of catching errors in data pipelines, without interrupting the job. This technic is interesting because errors are not lost and can be sinked in the best storage service depending on the error handing strategy :
- Errors can be sent in a
PubSub topic
to be reloaded - Errors can be stored in a
Bigquery
table to allow developer teams or partners, analysing errors and display them in a dashboard - Errors can be written in a
Cloud storage
file to be reloaded later in abatch
mode
In this article, a storage in a Bigquery
table was chosen.
The
Beam
job contains 2 sinks :
- Good outputs written in a
Bigquery
table - Failures written in a
Bigquery
table and also inCloud Logging
Failures will be exploited in the following way :
- An email alert is sent in real time when failures are written in
Cloud Logging
- A
Datastudio
dashboard will display the failures for a date range
Here you can see the use case diagram of this article :
2. Dead letter queue with Beam native
In this section, we will show how applying Dead Letter Queue and error handing with Beam
native.
2.1 Example with Beam Java
To apply dead letter queue
and error handling, Beam
suggests handling side outputs with :
TupleTags
in aDoFn
class- Built in components
MapElements
FlatMapElements
andexceptionInto
andexceptionVia
methods, example :
@Override
public Result<PCollection<TeamStats>, Failure> expand(PCollection<TeamStatsRaw> input) {
Result<PCollection<TeamStatsRaw>, Failure> res1 = input.apply("Validate fields", MapElements
.into(of(TeamStatsRaw.class))
.via(TeamStatsRaw::validateFields)
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Validate fields", exElt)));
PCollection<TeamStatsRaw> output1 = res1.output();
PCollection<Failure> failure1 = res1.failures();
Result<PCollection<TeamStats>, Failure> res2 = output1.apply("Compute team stats", MapElements
.into(of(TeamStats.class))
.via(TeamStats::computeTeamStats)
.exceptionsInto(of(Failure.class))
.exceptionsVia(exElt -> Failure.from("Compute team stats", exElt)));
PCollection<TeamStats> output2 = res2.output();
PCollection<Failure> failure2 = res2.failures();
final TransformToTeamStatsWithSloganFn toStatsWithSloganFn = new TransformToTeamStatsWithSloganFn(
"Add team slogan",
slogansSideInput
);
final PCollectionTuple res3 = output2.apply(name,
ParDo.of(toStatsWithSloganFn)
.withOutputTags(toStatsWithSloganFn.getOutputTag(), TupleTagList.of(toStatsWithSloganFn.getFailuresTag()))
.withSideInput("slogans", slogansSideInput));
PCollection<TeamStats> output3 = res3.get(toStatsWithSloganFn.getOutputTag());
PCollection<Failure> failure3 = res3.get(toStatsWithSloganFn.getFailuresTag());
PCollection<Failure> allFailures = PCollectionList
.of(failure1)
.and(failure2)
.and(failure3)
.apply(Flatten.pCollections());
return Result.of(output3, allFailures);
}
The problem with this approach, the code is verbose and we have to repeat technical code for error handling.
Link to Github repository :
2.2 Example with Beam Python
In Python
sdk, the principle is the same, we have to use TupleTag
in a DoFn
class, example :
def expand(self, inputs: PCollection[TeamStatsRaw]) -> \
Tuple[PCollection[TeamStats], PCollection[Failure]]:
# When.
outputs_map1, failures_map1 = (
inputs | VALIDATE_FIELDS >> ParDo(TeamStatsRawFieldsValidationFn(VALIDATE_FIELDS))
.with_outputs(FAILURES, main='outputs')
)
outputs_map2, failures_map2 = (
outputs_map1 | COMPUTE_TEAM_STATS >> ParDo(TeamStatsMapperFn(COMPUTE_TEAM_STATS))
.with_outputs(FAILURES, main='outputs')
)
final_outputs, failures_map3 = (
outputs_map2 | ADD_SLOGAN_TEAM_STATS >>
ParDo(
TeamStatsWithSloganFn(ADD_SLOGAN_TEAM_STATS),
AsSingleton(self.slogans_side_inputs)
)
.with_outputs(FAILURES, main='outputs')
)
result_all_failures = (
(failures_map1, failures_map2, failures_map3)
| 'All Failures PCollections' >> beam.Flatten()
)
return final_outputs, result_all_failures
As for the Java part, the problem is the same and we have to repeat the technical code for error handling.
Link to Github repository :
3. Dead letter queue with Asgarde library
In this section, we will show how applying Dead Letter Queue and error handing with Beam
and Asgarde
library.
3.1 Example with Asgarde Java
Asgarde
Java
proposes a CollectionComposer
class to simplify error handling with less code and more expressive code :
@Override
public Result<PCollection<TeamStats>, Failure> expand(PCollection<TeamStatsRaw> input) {
return CollectionComposer.of(input)
.apply("Validate fields", MapElements.into(of(TeamStatsRaw.class)).via(TeamStatsRaw::validateFields))
.apply("Compute team stats", MapElementFn
.into(of(TeamStats.class))
.via(TeamStats::computeTeamStats)
.withStartBundleAction(() -> LOGGER.info("####################Start bundle compute stats")))
.apply("Add team slogan", MapProcessContextFn
.from(TeamStats.class)
.into(of(TeamStats.class))
.via(c -> addSloganToStats(c, slogansSideInput))
.withSetupAction(() -> LOGGER.info("####################Start add slogan")),
Collections.singletonList(slogansSideInput))
.getResult();
}
Link to Github repository :
3.2 Example with Asgarde Kotlin
Asgarde
proposes also extensions for Kotlin
with more functional programming style :
override fun expand(input: PCollection<TeamStatsRaw>): Result<PCollection<TeamStats>, Failure> {
return CollectionComposer.of(input)
.map("Validate fields") { it.validateFields() }
.mapFn(
name = "Compute team stats",
startBundleAction = { LOGGER.info("####################Start bundle compute stats") },
transform = { TeamStats.computeTeamStats(it) })
.mapFnWithContext<TeamStats, TeamStats>(
name = "Add team slogan",
setupAction = { LOGGER.info("####################Start add slogan") },
sideInputs = listOf(slogansSideInput),
transform = { addSloganToStats(it, slogansSideInput) }
)
.result
}
Link to Github repository :
3.3 Example with Asgarde Python
Asgarde
Python
as in Java
library, proposes a CollectionComposer
class to simplify error handling :
def expand(self, inputs: PCollection[TeamStatsRaw]) -> \
Tuple[PCollection[TeamStats], PCollection[Failure]]:
result = (CollectionComposer.of(inputs)
.map("Validate raw fields", lambda t_raw: t_raw.validate_fields())
.map("Compute team stats", TeamStats.compute_team_stats)
.map("Add slogan to team stats",
self.add_slogan_to_stats,
slogans=AsSingleton(self.slogans_side_inputs),
setup_action=lambda: '######### Start add slogan to stats actions')
)
return result.outputs, result.failures
Link to Github repository :
4. IO of Beam pipeline for failures
In this example, the pipeline outputs are :
- Good outputs written in a
Bigquery
table - Failures caught with
Beam/Asgarde
written in aBigquery
table for analytics and exposition in a dashboard - Failures written in
Cloud logging
with a specificlog
pattern
containing the input element and the stacktrace
The schema of the BigQuery
job_failure
table is :
To be cost effective, we added a partition per DAY on the dwhCreationDate
field.
We also added a clustering on featureName
jobName
componentType
and pipelineStep
fields to increase query performance when user will use WHERE clause on it.
The json
schema :
[
{
"name": "featureName",
"type": "STRING",
"mode": "NULLABLE",
"description": "Feature name concerned by error"
},
{
"name": "jobName",
"type": "STRING",
"mode": "NULLABLE",
"description": "Job name concerned by error"
},
{
"name": "pipelineStep",
"type": "STRING",
"mode": "NULLABLE",
"description": "Pipeline step concerned by error"
},
{
"name": "inputElement",
"type": "STRING",
"mode": "NULLABLE",
"description": "Input element when error occurred"
},
{
"name": "exceptionType",
"type": "STRING",
"mode": "NULLABLE",
"description": "Exception type of error"
},
{
"name": "stackTrace",
"type": "STRING",
"mode": "NULLABLE",
"description": "Stack trace of error"
},
{
"name": "componentType",
"type": "STRING",
"mode": "NULLABLE",
"description": "GCP Component type (Dataflow, Composer....)"
},
{
"name": "dwhCreationDate",
"type": "TIMESTAMP",
"mode": "NULLABLE",
"description": "Creation date of error"
}
]
4.1 Code of BigQuery IO in Beam Java
public PTransform<PCollection<Failure>, ? extends POutput> write() {
return BigQueryIO.<Failure>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.to(failureConf.getOutputDataset() + "." + failureConf.getOutputTable())
.withFormatFunction(failure -> toFailureTableRow(failureConf, failure))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
}
private static TableRow toFailureTableRow(FailureConf failureConf, Failure failure) {
val creationDate = Instant.now().toString();
return new TableRow()
.set(FEATURE_NAME.getValue(), failureConf.getFeatureName())
.set(PIPELINE_STEP.getValue(), failure.getPipelineStep())
.set(JOB_NAME.getValue(), failureConf.getJobName())
.set(INPUT_ELEMENT.getValue(), failure.getInputElement())
.set(EXCEPTION_TYPE.getValue(), failure.getException().getClass().getSimpleName())
.set(STACK_TRACE.getValue(), ExceptionUtils.getStackTrace(failure.getException()))
.set(COMPONENT_TYPE.getValue(), COMPONENT_TYPE_VALUE)
.set(DWH_CREATION_DATE.getValue(), creationDate);
}
Link to Github repository :
4.2 Code of Cloud Logging IO in Beam Java
public PDone expand(PCollection<Failure> input) {
input.apply(STEP_MAP_FAILURE_JSON_STRING, new LogTransform());
return PDone.in(input.getPipeline());
}
static class LogTransform extends PTransform<PCollection<Failure>, PCollection<String>> {
public PCollection<String> expand(PCollection<Failure> input) {
return input
.apply(STEP_MAP_FAILURE_JSON_STRING, MapElements
.into(strings())
.via(FailureCloudLoggingWriteTransform::toFailureLogInfo))
.apply(STEP_LOG_FAILURE, MapElements.into(strings()).via(this::logFailure));
}
private String logFailure(String failureAsString) {
LOGGER.error(failureAsString);
return failureAsString;
}
}
private static String toFailureLogInfo(Failure failure) {
val inputElementInfo = "InputElement : " + failure.getInputElement();
val stackTraceInfo = "StackTrace : " + ExceptionUtils.getStackTrace(failure.getException());
return inputElementInfo + "\n" + stackTraceInfo;
}
The link to the Github repository :
4.3 Code of BigQuery IO in Beam Python
def expand(self, inputs_failures: PCollection[Failure]):
return (inputs_failures
| 'Map to failure table fields' >> beam.Map(self.to_failure_table_fields)
| "Sink failures to Bigquery" >> beam.io.WriteToBigQuery(
project=self.pipeline_conf.project_id,
dataset=self.pipeline_conf.failure_output_dataset,
table=self.pipeline_conf.failure_output_table,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
def to_failure_table_fields(self, failure: Failure):
return {
FailureTableFields.FEATURE_NAME.value: self.pipeline_conf.failure_feature_name,
FailureTableFields.JOB_NAME.value: self.pipeline_conf.job_type,
FailureTableFields.PIPELINE_STEP.value: failure.pipeline_step,
FailureTableFields.INPUT_ELEMENT.value: failure.input_element,
FailureTableFields.EXCEPTION_TYPE.value: type(failure.exception).__name__,
FailureTableFields.STACK_TRACE.value: get_failure_error(failure),
FailureTableFields.COMPONENT_TYPE.value: 'DATAFLOW',
FailureTableFields.DWH_CREATION_DATE.value: datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
}
The link to the Github repository :
4.4 Code of Cloud Logging IO in Beam Python
def expand(self, inputs_failures: PCollection[Failure]):
return (inputs_failures
| 'Map to failure log info' >> beam.Map(self.to_failure_log_info)
| "Logs Failure to cloud logging" >> beam.Map(self.log_failure))
def to_failure_log_info(self, failure: Failure):
input_element_info = f'InputElement : {failure.input_element}'
stack_trace_info = f'StackTrace : {get_failure_error(failure)}'
return f'{input_element_info} \n {stack_trace_info}'
def log_failure(self, failure_log_info: str):
logging.error(failure_log_info)
return failure_log_info
The link to the Github repository :
5. Create an alerting policy in Google Cloud on Beam failures
After writting failures in Cloud Logging
, we want to send an email alert in real time, here are the steps in Google Cloud
to achieve this need :
- Create a
log based metric
that filters the failurespattern
written by theDataflow
job - Create a
notification channel
, email was chosen for this example - Create an
alerting policy
based on thelog based metric
andnotification channel
created previously
5.1 Create a log-based metric
We will firstly create a log based metric
on Dataflow
jobs errors.
Some explanations about logs format on Cloud Logging
An entry in Cloud Logging
is represented by the LogEntry
format and object :
Here you can see the json
representation of LogEntry
object :
{
"logName": string,
"resource": {
object (MonitoredResource)
},
"timestamp": string,
"receiveTimestamp": string,
"severity": enum (LogSeverity),
"insertId": string,
"httpRequest": {
object (HttpRequest)
},
"labels": {
string: string,
...
},
"metadata": {
object (MonitoredResourceMetadata)
},
"operation": {
object (LogEntryOperation)
},
"trace": string,
"spanId": string,
"traceSampled": boolean,
"sourceLocation": {
object (LogEntrySourceLocation)
},
"split": {
object (LogSplit)
},
// Union field payload can be only one of the following:
"protoPayload": {
"@type": string,
field1: ...,
...
},
"textPayload": string,
"jsonPayload": {
object
}
// End of list of possible types for union field payload.
}
We can add filters on the LogEntry
fields.
Creation of the Log-based Metric
In Google Cloud Console
we access to the log-based metric
menu and fill firstly the metric type
metric name
and description
:
Metric type
We chosen the metric type
with counter
because everytime Dataflow
jobs write errors in logs with a specific pattern, we want to increment this metric.
The metric name
in this example is dataflow_jobs_intercepted_errors
Filter
Then we fill the filter
and label
:
The format used in the filter
is the one usually used in Cloud Logging
and logger explorer
in Google Cloud
resource.type=dataflow_step
allows to target only on Dataflow
jobs and services.
severity=ERROR
specifies that the filter concerns only errors in the logs.
Our Dataflow
jobs write errors in Cloud Logging
with a specific pattern with InputElement
and StackTrace
keywords, that’s why the filter adds a criteria based on jsonPayload.message
jsonPayload.message =~ InputElement.*
- The
=~
meanscontains
- The
.*
meansany character
- This criteria allows to search the pattern containing
InputElement
Labels
Labels allow log-based metrics to contains multiple time series.
In this case, we add the following label resource.label.job_id
because we want applying the filter on logs per Dataflow job
and not for all jobs together.
5.2 Create a notification channel
In the menu Notification channels, we create an email channel
5.3 Create the alerting policy based on log-based metric
In this section, we will create the alerting policy
based on the log based metric
created previously.
Firstly in the New Condition menu we fill Dataflow
in the metric search field, then select log based metrics
and the metric created previously called dataflow_intercepted_errors
In the Configure trigger menu we select Treshold as condition type and the Treshold value is 0.9
The meaning is, if there is one error per Dataflow
job, an alert will be fired (1 greater than 0.9)
The last menu to configure is Alert detail, we set the notification channel created previously that is an email address.
For each alert, an email will be sent to the specified address.
It’s worth noting that we can specify multiple notification channels (several emails, Slack…).
6. Alerting email example
Here you can see an example email.
2 links are proposed in the email :
- VIEW INCIDENT proposes a link the current incident in
Google Cloud
- VIEW LOGS proposes a link to the error logs in
Cloud Logging
Example of error logs for the Dataflow
job in Cloud Logging
:
You can see :
- The Cloud Logging log explorer page
- The filter in the Query menu
- At the left side, the log fields : RESOURCE TYPE, SEVERITY, JOB_NAME, REGION…
- The Query result containing the error logs, it corresponds to the inputElement and the StackTrace that is the pattern specified by the Beam/Dataflow job
7. Display failures in a Datastudio dashboard
In this section, we will create a Datastudio
dashboard having the responsabilty to display failures.
This dashboard proposes the following filters :
date
range ondwhCreationDate
- Dropdown list on
featureName
field - Dropdown list on
jobName
field - Advanced filter on
InputElement
(equals, contains..) - Advanced filter on
StackTrace
The rest of the page contains :
- The result failures data in a table
- Pie chart in percentage per
exceptionType
- Pie chart in percentage per
jobName
- Bar chart with a record count per
dwhCreationDate
andjobName
I will give a deep explanation for this dashboard in a separated article.
I share resources that could be helpful for some components used in this article :
Asgarde Java :
Asgarde Python :
Talk at Beam Sumit on Asgarde :
If you like the Asgarde
project support us with a Github
star ✩
If you like my articles and want to see my posts, follow me on :