DLQ With Spark Streaming and Avro

Yigal Kassel
Riskified Tech

--

Imagine you have a spark stream that reads from a Kafka topic.

The topic is populated with data in Avro format originating from multiple producers. Each producer publishes a subset of the data while the rest of the data is usually null.

The producers are managed by different development teams, and schema-registry is being used to enable schema evolution.

Schema registry has an important role here, making sure the producer and the consumer hold data in the same structure.

It get the schema on the producer side, this data is then being converted to binary format.

On the consumer side, the schema registry is used to parse the binary data into some readable data.

Trying to send data with incompatible schema — starting early

Everything is going well and fun, until at some point one of the producers accidentally publishes data that introduce a breaking schema change. I.e it published data that won’t fit the agreed upon schema.

The stream reads a batch of data from the topic. Every batch contains several rows and the data is in Avro format, i.e. it has to parse it in order to manipulate it.

But since the schema version of the new data isn’t compatible with the expected schema of the data, the parsing for the batch fails. Since the batches are processed sequentially, it means that the stream stop processing new data.

The thing with spark is that sometimes you just declare what manipulation you want to perform on the data, but it get’s executed only down the road.

So if the operation fails, you don’t have the control of the flow.

And this means that the application will just crash since it didn’t manage to perform the operation it declared it will do.

This is problematic because it can lead to data loss and to delay the downstream services that rely on this data.

A quick intro to our datalake architecture

I have your attention now, there is a problem. But what would a “happy flow” look like?

So we have a platform with several components:

  • Kafka — source of the data. The data arrive in Avro format (Avro is a data serialization format that is well-suited for data streaming scenarios, because it has a compact binary representation and supports data schema evolution.)
  • Spark streams — reading the data from kafka and writing it into the datalake.
  • Datalake on top of S3 — the data destination, where we store data permanently for reasearch and analytics.
  • Schema registry — keep all the Avro schemas and support schema evolution. Spark streams use it to know what schema of the data they should expect to.
  • Streaming management service — keeps all the streams configurations. Using it’s API we control the Spark Streams.

A happy flow would look something like this.

A development team decides to create a new entity inside one of its products.
They would model it and make sure its schema is represented in Avro and published to Schema registry.
They would create a Kafka topic, and start publishing new messages to it.
Next, the team would use the Streaming management service to create the spark stream.
The stream will start, read the schema from schema registry, based on the configuration it got from the Streaming management service.
Based on the schema the stream will read the messages from the topic in Kafka.

It will then parse the message from avro format to spark using a library called ABRiS (More on this a bit later).

Our Datalake architecture in a nuthsell

Zoom-in on the problem

In order to read data from Kafka in Avro format and parse it in a Spark stream so Spark can manipulate it we are using the ABRiS library.

ABRiS is a library that allows you to use Apache Avro data with Apache Spark. ABRiS provides a bridge between Avro and Spark by allowing you to read and write Avro data from Spark DataFrames and Datasets.

This can be useful if you want to use Avro data with Spark, but don’t want to deal with the complexity of directly interacting with the Avro API.

ABRiS provides a function from_avro that can be called inside spark. from_avro accepts a spark column containing avro binary data and some configuration, and it returns a struct column with all the avro data parsed.

This is a snippet that shows how a spark application (Streaming or batch) can use ABRiS to parse Avro data from the “value” column.

Now let’s see an example:

This is an Avro schema with two columns. One named id with the type long and another one called time with a long type with a logical type of timestamp.

This is an entry that complies with the schema:

As you can see the record has a valid long value in the time column.

The problem is that it isn’t actually a valid timestamp. So since logically we are dealing with a timestamp value, it’s causing an overflow when ABRiS tries to parse the value.

And this problem is so hard because the parsing error occurs deep inside a different thread that is in charge of executing the spark DAG.

So the main thread can’t catch the exception and the application crashes.

Ok, we have a problem, so now what?

Let’s start talking generally and then we will be a bit more specific.
What do we want from the stream?
We want the stream to overcome situations where it gets malformed data.
We expect it to try parsing the Avro data, and be able to notify the main thread when it fails. Not to just crash. And then continue to process the next row of data.
How would the parser thread notify the main thread that it has failed?

Sentinel value — A special value used to indicate the end of a sequence or to mark the absence of a value in a data structure.

Sounds so trivial, doesn’t it?
Luckily for us, we were not the only ABRIS users with this kind of problem. When we were trying to solve the problem they were working on releasing a mechanism for exception handling.

Using this exception handler we can have the from_avro function always return values that fit the spark running plan.
The exception handler will return a sentinel value, and the main spark thread will use it to determine whether the data was parsed correctly or the original AVRO data should be sent to the DLQ.

Let’s be specific — talk to me in code

Since ABRiS released the DeserializationExceptionHandler class we could extend it and decide to override the handle method that would be called once a serialization error will occur (e.g. a large long number that leads to an overflow).

We build a new record with the id of -1 and return it back to spark DAG.

Then in line 4 you can see we’ve added it to the AbrisConfig we use:

And here comes the updated way we handle the stream.

In line 6 you can see we filter the rows with id of -1, which we know is the sentinel value the DeserializationExceptionHandler we’ve built had put, and send those rows to our DLQ in Kafka. Next, in line 16 the code gathers all the valid rows and continues processing them (here just logging them to the console, just for the example).

Thoughts about production

This is a great example of how to have a basic implementation of a spark structured stream that won’t crash when a record won’t exactly fit the schema it expects.

This is nice but a production system should consider a few more things. Performance, Data quality and serve a wide range of use cases.

First to consider, Performance.

This process of examining every row after the parsing to check whether the parsing succeeded or failed is very costly in terms of performance.

We had to ask ourselves, is it worth it? Are we facing this problem so often?

And we came to the conclusion that for us, it will make better sense to wrap this flow in a flag and turn it on only once we observe a problem in the flow.

Since our team already has a very reliable service to manage all the streams configuration, we just had to add this to the configuration and we were ready to go (You can read more about this service in this great article by my teammate Or).

Now, let’s continue.
Let’s say we have some kind of a problem, the DLQ flag I mentioned is turned on. What’s the percentage of corrupted records?
Do we want to continue and consume data from a topic with all its data corrupted?
Maybe we want to stop consuming data when over 60% of it is corrupted?
It mainly depends on the upstream services and if they had something temporal or a more permanent bug.

Anyway to give ourselves more control over the streams we also made this ratio of corrupted entries in a batch configurable in our stream management service.

Another thing is having a generic solution for all the schemas. Now we have a specific solution that is tailored made for each schema we want to use, but we have a more generic solution in the making.

This is going to be a handler that for every given schema, would generate a valid record that includes the sentinel value, and so will enable the DLQ mechanism for this specific stream.

It will make use of a special metadata column that we encourage development teams to use in their streams. So they can get some extra metadata on their stream.

And we will make use of it to guarantee that we can store a sentinel value when malformed rows arrive in the stream.

Stay tuned for our next updates on the subject.

--

--

Yigal Kassel
Riskified Tech

I work as a Big Data Engineer at Riskified, and am passionate about learning new things.