Apache Beam schemas and Cloud Dataflow updates

Israel Herraiz
Google Cloud - Community
2 min readFeb 9, 2024
Photo by Андрей Сизов on Unsplash

TL;DR: Remember to use @SchemaFieldNumber with your Java pipelines using schemas in Apache Beam to make sure that the schema is always the same in different job runs. If you are using Cloud Dataflow and Beam schemas, updates may fail if you don't annotate the field numbers.

Schemas in Apache Beam bring lots of benefits and additional functionalities, but they can cause also subtle issues. When you are using schemas in a Java data class, and you decide to use schema inference, the order of the fields in the generated schema is not always the same.

This is not a problem if you are running a single job. But in streaming jobs, if you want to update the job (for instance, using the Cloud Dataflow streaming job updates), the new job may have a different order of the fields for the same schemas. So even if you don't change your data classes, the inferred schema may be different. When that happens, job updates may fail, because the schemas have changed.

Updates in streaming are essential to ensure the continuity of processing in production pipelines. I know cases of customers that have been runing the same pipeline over years, updating their code every once in a while.

How can you ensure that using schemas do not pose a risk for updating a job? By annotating the field number in every field, so the order of the fields in the inferred schema is always the same.

Let's see an example.

This is a POJO class, as it is shown in the examples in the Beam prog

With that class, you risk that the order of the fields change between runs, and that your new update job is incompatible with the previous one, due to decode errors.

How can we remove that risk? Add @SchemaFieldNumber annotations to each field, so they are always in the same order:

Remember that schema field numbers must be correlative. You cannot have gaps, so I recommend you to always start with 0, and increase the numbers one by one. Also, remember that the Java annotation expects a string, but the string must contain an integer (it will be parsed as integer when the schema is inferred).

--

--