Evolve your data model in Flink’s state using Avro

Niels Denissen
inganalytics.com/inganalytics
8 min readJun 14, 2018

TL;DR

How do you update your data model in Apache Flink? If you use Flink for long-running jobs that need schema changes in between, you’ll sooner or later find yourself wanting to update the data model you’re using to write data to state. As of now this is not supported by Flink out of the box. This blog post provides a solution to this problem using Apache Avro to serialize and deserialize data.

We’ll show you how to extend your own managed state serializer, ensure your data supports evolution using Avro and how you can then update your Flink job with all your data migrated successfully.

What is the problem we’re trying to solve?

Say your new Japanese employee just started, but comes to you saying he doesn’t understand the documentation. This was all written in Chinese by his predecessor, which made sense at the time. What do you do?

This is an analogy for our Flink state issue. Whenever we update our old job (i.e. the Chinese employee leaves) we create a savepoint (i.e. the documentation he wrote). The new job (i.e. the Japanese employee) will start from this created savepoint. Since all our data is this way serialized by our first job, in Chinese, our new Japanese job is unable to decode the data. The problem is that there is no supported way to migrate state in Flink yet.

Since we’re receiving our data from a Kafka bus, we could replay from that. This however, is time consuming, can mess up stream joining of historic data during replay and most importantly, is limited to kafka’s retention period. So is there a better way to solve this?

Flink + Avro: not for the faint of heart?

Apache Avro jumps to mind when thinking about Schema Migration. You could see Avro as the language interpreter that can translate Japanese to Chinese by converting it to and from a 3rd very efficient language, let’s call it “byte code” ;). You can tell Avro which language (or schema) the data was written with and with which it has to be read.

However, this Stack Overflow post describe the usage of Avro for Flink serialization as ‘not for the faint of heart’. Although it doesn’t specify why, typical questions you may like to see answered when starting this endeavour:

  • Where can I find an example of Flink and Avro working together?
  • What are common pitfalls when you insert a custom serializer in Flink
  • What are good practices when using Avro?

How Avro works exactly we’ll leave to the clear documentation of Avro itself. In this post you will get a solid understanding how you can combine the two to evolve the data stored in your Flink cluster without losing any state. We’ll do so by answering each of the questions above.

Where can I find an example of Flink and Avro working together?

We’ve made the following Github repository, outlining an example of how to use Avro as a custom data serializer for Flink. The repo shows you code on how we’ve created a custom serializer for Flink with Avro, defined the Avro schemas, case classes and have implemented this in a simple process function. It also contains test cases proving the schema evolution works. In this section we’ll walk through the process step by step, linking to the code base where we can.

Let’s first look at a brief overview of how the custom serializer works. We’ve extended the custom serializer of Flink adding code to support encoding/decoding avro data. How this works is shown in the figure below:

Data serialization for first job
  1. Receiving data in a process function.
    Data received by a Flink process function is processed as case classes in Scala. That’s what we use to define our logic throughout the application. Here’s such a process function.
  2. Reading/writing data to Flink state.
    We define state in Flink with Avro generic records, upon reading/writing data in our process functions we transform those records to and from case classes. This transformation is defined per case class.
  3. Serializing/Deserializing data in Flink’s state to disk upon check/savepointing.
    The generic record from Avro will be written to disk here. This is done by extending Flink’s serializer and defining some custom logic on how to read/write Avro data.
    Generic records contain both data and the Avro schema of that data. Such a schema looks like this. Along with the data we write a signature of the Avro schema. This signature is needed to decipher the data later on, since we need to know how the data was written to be able to read it.

Now we’ve seen how data is saved to disk, let’s see how we update our job. The diagram below will show how this works:

Data migration through second Job
  1. Load data from disk
    First thing that happens is the data is read from disk into Flink’s State. The data is read with the original schema it was written with.
  2. Now during the execution of Job 2, 2 scenarios can happen:
    a. Data is never used by Job 2
    If data from state is never used in Job 2, it will remain in Flink’s state in its original form, so with schema 1.
    b. Data is used by Job 2
    When data is used, it will be transformed to a case-class by Job 2. This requires your data to support evolution which we’ll dive into deeper in the following section on Avro. After transformation it will be written back to Flink’s state by transforming it to the new Generic Record that has the new schema as you can see in the schema following path 2b.

Upon writing back to disk in case of a check/savepoint, all data will be written with it’s own appropriate Avro schema. Therefore it’s important to always ensure you adhere to Avro’s rules on updating your schema so that all past versions are compatible with your latest schema.

What are common pitfalls when you insert a custom serializer in Flink?

Picking the wrong serializer in Flink

There are multiple types of serializations in Flink. That is since Flink differentiates state in two categories: `operator state` and `keyed state`. Operator state is the state Flink defines itself when you use the default stream operations of Flink (e.g. calculating the mean on a window requires state to be kept). Keyed state on the other hand is defined by the user explicitly in self-defined operators. The serializer for operator state is defined on the environment, but the one for keyed state you can specify yourself upon creation of your self-defined function. In our case we needed the keyed state serializer which you’ve seen in this example.

Specifying a serializer per case class

As your data evolves, this means your serializer evolves. Flink writes the serializer along with the data, which means that in order to read your state, your old serializer will have to be present on the class path of your application. This forces you to have all past versions of your serializer in your codebase. A way around this is to use Avro’s Generic Records as the type for you serializer. Since this data type never changes even if you evolve your data, this ensures your serializer is always present on your classpath. It does require you to transform the generic record to/from a case class upon reading/writing your state.

Writing unused generic records with a new schema instead of their own

As we’ve seen in the example diagrams above, there’s a difference between using data from Flink’s state or leaving it untouched. Basically whenever we don’t use the data in state, it will remain in its original form with its old schema attached.
Initially when we first wrote the Avro serialization for Flink, we accidentally wrote all data in state with the new Avro schema, leading to the following inconsistency:

Ill defined schema migration when not using data in state

You’ll notice the path where it goes wrong. Whenever state isn’t used and thus not updated, we should write this data back with it’s original schema, instead of the new one.

What are good practices when using Avro?

Writing Scala data structures to Avro’s Generic records

As you’ll be using a variety of different data structures in your case classes, you’ll also require a variety of Avro serialization types. The Avro schemas listed in the above example and their accompanying methods to transform from/to Generic Records already show the ones we’ve had to use. A few notes on the different types (which you’ll find in the examples):

  • Timestamps are written as longs with a metadata field listing them as timestamps.
  • Nested data types have to be referenced in full the first time, any subsequent usages have to simply list the namespace+name of that type.
  • Array’s have to be written by ingesting them into Avro’s arrays first.
  • A map with a String as key will be written as UTF-8 string to avro, see the helper function in this class for an example.

Don’t write the entire Avro schema with your data to avoid too much overhead

Along with any Avro Generic Record you have to write the schema of the data. This is to ensure Avro knows how to read the data from disk. Often though, your schema will be as large or larger than the data you write, causing lots of overhead. Instead of writing the complete schema, you can also calculate a fingerprint of that schema and write that along with the data. The fingerprint functions as a sort of hash. Upon reading the fingerprint from disk, you can cross-reference it with a known map of fingerprints to Avro schemas and thus determine what schema was used to write the data. You’ll find an example of this in the Avro Converter class here.

We hope this blog will help you manage your job’s state in Flink in the future. If you have any feedback/questions/suggestions or just would like to discuss the solution we’ve implemented, please don’t hesitate to contact us!

By Niels Denissen

--

--