Data replication across backend services with Kafka and Protobuf

Emmanuel Joubaud
JobTeaser Engineering
12 min readApr 14, 2023

The Jobteaser application contains a lot of different relatively independent modules to help universities provide career guidance to students: a job board, a career event management system, a career advice appointment management system…

When we decided to migrate our application’s backend from a monolith to a service-oriented architecture, we strived to keep each module as isolated as possible from the others in the event of an incident. If the career appointment system was down, students should still be able to browse and apply to job ads.

That isolation is achieved through what we’ve called our Silos architecture. The gist is we avoid synchronous API calls between backend services, and prefer asynchronous data integration between services.

There are a lot of different ways to implement asynchronous communication between services, and few companies share the details of theirs so we had to figure out a lot of stuff on our own. Now that we’ve refined our system, we thought we’d share the details of our approach, based on simple data replication using Kafka and Protobuf.

How it works

When a write operation happens, data is changed first on the Source of Truth service, who then publishes it to a Kafka topic that consumers can subscribe to, in order to replicate their own local copy in their database

Source of Truth and consumers

Every table in our data model has an owner service, also called the Source of Truth (SoT) for that data, or the producer. For instance, the job board service may be the SoT for the job ad data.

The owner service has 3 main responsibilities:

  • it receives and validates all write requests for the data it owns (Create, Update, Delete)
  • it stores the data to its own database, i.e. the Source of Truth, the authoritative state of the data
  • if it’s data that other services might want to access, it publishes its latest state as a message into a Kafka topic

Consumer services that are interested in accessing a given table can then subscribe to the Kafka topic for that table, and receive a message for each state update, so they can keep a local copy up-to-date in their own database by replaying the changes.

Protobuf messages

The Protobuf serialization format is statically typed. The producer and the consumer both need access to the schema to respectively serialize and deserialize the data.

Each message contains the whole new state of the object, serialized using the Google Protobuf serialization format.

Protobuf is a statically typed format. The producer and the consumer both need access to a shared schema definition that tells them how to serialize and deserialize the data.

In a given Kafka topic, every message is always serialized using the same Protobuf message type, so that all the objects have a predictable format.

That creates an API contract, effectively turning our Kafka topics into asynchronous APIs. We version those APIs by adding a version into each data replication topic and each protobuf message name. For instance the jobad.v1.main topic only contains messages of Protobuf type jobads.v1.JobAd . Protobuf graciously handles non-breaking changes like adding new fields, but if we need to perform breaking changes on our API format, we’ll have to create a new topic and a new message.

Double-write and transactions

So whenever we make a database change to a replicated table, we want to make sure we also push a message to Kafka.

But we can’t push them both atomically. No transaction can span both the database and Kafka.

Instead, we need to pick our poison between two types of possible discrepancies, depending on whether we first commit the change to the database or to Kafka:

// option 1: push to Kafka within the transaction
database.begin_transaction()
database.save(record)
kafka_topic.produce(record.to_proto()) // Failure here would rollback transaction
// Failure here would cause the change to be pushed to Kafka but not the DB
database.commit_transaction()
// option 2: push to Kafka after the DB transaction
database.begin_transaction()
database.save(record)
database.commit_transaction()
// Failure here would cause the change to be pushed to the DB but not Kafka
kafka_topic.produce(record.to_proto())

As you can see, we’re facing a conundrum:

  • With option 1, failures will be rare but more consequential when they happen: It takes the DB connection to drop precisely between the push to Kafka and the transaction commit to have a problem, but then you basically end up pushing false information into Kafka, data that doesn’t reflect the Source of Truth, and you have no easy way to recall it.
  • With option 2, failures are much more likely. All it takes is for Kafka to be down and your change will be stored in the DB but won’t make it to Kafka. But this is also easy to fix: you can just reproduce the missing message at a later time. You haven’t pushed any wrong information, just delayed its transmission.

We opted for option 2, because we care more about the accuracy of information than its timing.

Note that there are other options available, like the outbox pattern, 2-phase commits or event sourcing (discussed in more details below), but they introduce more complexity. So far this compromise in simplicity hasn’t proven a major source of trouble for us.

Ordering and idempotency

On the consumer side, we basically have a loop that listens to all new messages for a given topic, deserializes the Protobuf message and upserts the resulting object into its own local database.

But one complication is that the order of messages is not guaranteed. You can get some weak ordering guarantees in Kafka, but it’s always possible to reconsume old messages.

When that happens, if you just upsert any incoming messages to the DB, you risk overwriting a more up-to-date version of your record with an older message.

In order to fix that issue, we add an integer field called seq on each table that we want to replicate. That number is incremented before each update of the record, and included in the message pushed to Kafka.

That way, when the consumer processes an incoming message, if it already holds a copy of the same object (identified by its id) that has a higher seq than the incoming records, it knows that it already holds a more recent version of the object, and can just skip the message.

That, combined with the fact that we always produce the full state of the record, gives us a very important property called idempotency: no matter in what order the update messages arrive for a given record, the record will always end up in the same state in the consumer database.

Reproducing historic data

This system works fine for propagating changes in real-time as they happen, but we haven’t touched the problem of propagating historic data.

  • What happens when you create a new service that needs its local copy of the job ad objects. If you just plug it to the system, it will only get new updates about records, not the old existing ones.
  • What if you add a new field to the Source of Truth table, or start producing a new field that had values in the source database but wasn’t deemed useful to expose to the rest of the system before?
  • What if there was a bug in your producer that led you to produce faulty data, how do you fix that?
  • What if some changes failed to be propagated due to a Kafka outage?

We address all of the above problems with a common tool, called a reproduce task: a script that iterates on a Source of Truth table, and reproduces each record into its corresponding Kafka topic.

It’s a crude but simple tool that lets us bootstrap new services, fix discrepancies and add new fields to our replication mechanism.

Note that republishing a big table can put a non-trivial amount of strain on the sytem. It can imply pushing millions of records that will then have to be processed by each consumer of the topic (possibly tens of services), eating up a lot of computing resources and likely introducing queue build-ups and delays into the replication process.

Thankfully, we rarely need a republish to impact all the consumers. When we’re bootstrapping a new service, or an existing service needs to consume a new field, we can usually get away by reproducing the data into a dedicated topic that only one single consumer service will listen to.

Soft-Deletion

In this replication-oriented system we only ever push one type of payload: the full object’s state. There’s no metadata, no event type. It’s always treated by consumers in the same way: upsert (with seq check).

That means we have to pass information about the deletion as object state. We do so by adding a soft-delete flag, typically called delete_time to each produced object.

Our design even forces us to implement soft-delete logic in both the consumer and producer services. If we hard-delete records in the producer, we’d lose the information that the record ever existed and we couldn’t reproduce a deletion event that failed to propagate. If we hard-delete records in the consumer, we lose their seq and we may find ourselves reconsuming an old message, effectively recreating a record that should remain deleted.

So we never actually delete replicated records, but instead store a deletion flag, and code the application to treat records with that flag as deleted, e.g. to ignore records with the soft-delete flag in DB queries. When we need to anonymise data for GDPR compliance, we do so by overwriting any sensitive fields in the data rather than by deleting the whole records.

Of course other strategies would be possible, like using the outbox pattern or dedicated deletion events, but each would introduce its own set of complexities into the replication mechanism. Our default approach may also cause data storage issues, for instance in cases where we’d need to handle huge volumes of short-lived records. But we can always resort to ad-hoc solutions in these special cases, like hard-deleting cold records after a while.

Keep it super simple

So our main use-case with asynchronous communication is replicating raw data.

Speed is a desirable attribute of this system. While it is built with eventual consistency in mind and can tolerate some delays, the longer the delays, the more the impact risks being felt by end users.

Even more important is robustness. In the event of an error, the consumer will keep retrying the messages until they succeed.

That’s why we strive to keep our consumers dead simple. They’re just dumb pipes focused on doing one thing fast and reliably: data replication.

They won’t perform any kind of validation. Validation should be performed by the Source of Truth before committing the change. Once the change is made to the Source of Truth and pushed to Kafka, a consumer attempting to reject it would just be committing a denial of reality.

For this reason, we also avoid database constraints as much as possible in consuming tables, besides basic type casting. No NOT NULL, no FOREIGN KEY (those don’t tend to play well with eventual consistency), ideally no size limits.

Any logic included in the consumer is a possible source of bugs, failures or delays and is actively avoided.

Alternatives

This is just one way to handle async communications in a distributed system. Here are some of the alternatives we’ve considered and why we preferred that system.

Why not a CDC/Debezium?

There are generic tools, called Change Data Capture, that can listen to database changes and replicate them to other databases. Using them would save us from having to implement the producers and consumers, and the mapping to Protobuf, in each service’s application code.

Our main issue with those tools is that they always introduce tight coupling to the source database schema. If you change the source database schema, say remove a column, the tool will either break, or propagate the schema changes to the target databases, hence breaking their own internal logic that depends on that schema.

If your data team has ever used such tools to populate their data warehouse (as is fairly common), then you’re probably familiar with that kind of incident where a backend developer changing his application’s schema breaks your data team’s BI reports. The postmortems tend to lead to gatekeeping processes, cross-team coordination efforts breaking the independence of your teams or even schema freezes that can hinder your ability to change your application.

This kind of problem is the very reason why the industry has adopted the concept of versioned API contracts.

When several applications depend on the same database, then the database schema becomes a de facto API contract.

Here, the conversion to Protobuf provides us with a layer of indirection between the API schema and the API contract used for replication. It also helps us enforce API versioning, which gives us a way forward to change our model by creating a new API version, with a new Protobuf version and a new topic, and deprecating the old version.

Sadly we haven’t found any off-the-shelf tool that can plug into the database and react to low-level database changes, then map them to a high-level versioned “business” intermediary representation then use it to replicate that versioned flow into the consumer databases. But we’ve built some internal libraries to help automate the consuming and the reproducing of objects in our services.

Why not Event-Sourcing?

Our replication-based system has some superficial resemblance with the Event Sourcing pattern. But there are also crucial differences:

  • In Event Sourcing, the event stream is the Source of Truth, rather than the database of the source service. You need to persist the whole chain of events to reconstruct your state. By contrast, we use Kafka as a temporary communication rather than a persistent storage mechanism.
  • The consumers need to understand the semantics of each type of event they’re interested in, in order to be able to reconstruct state from the events. Each event type reflects specific actions that happened to your system, like ArchivedJobAds or DeletedUser, and the consumers require specific logic to derive the state of an object by applying different types of events differently. This creates coupling between producer and conumers: If you introduce a new ArchivedJobAds event, you first need to update all the consumers to undertand and process that new event, or they won’t be able to reconstruct an accurate representation of the JobAd object.
  • Order and exhaustiveness matter with Event Sourcing. Your system will end up in a different state if you apply a TimeFive and a PlusThree event in different orders, or skip one. Event Sourcing events also don’t typically contain the full state of the object they contain, but only the relevant data for the event’s application, kind of like a diff, so if you miss one the changes it was carrying might be lost. By contrast, we produce the full state of the object in each of our change events, so it doesn’t matter if you miss an intermediary event or if you consume them out of order, you can still easily reconstruct the final state of the record.

For all these reasons, we feel that a lower-level replication mechanism gives us a much simpler and less coupled system that fits well the majority of use-cases.

There are however cases when we do miss the semantics provided by event types. That’s typically when we want to trigger additional treatments, besides replication, in reaction to a specific event like a status change. For instance a service may react to a job ad becoming “published” by pushing it to outside sources.

We can address those needs by 2 different mechanisms:

  • Inferring the event from the state change, by comparing the new consumed state with the old existing state stored in the consumer’s data. For instance if we consume a job ad in the “published” state and see that we were already holding a “draft” version of that same job ad, we can treat that as a publication event. That adds unwelcome complexity and error risk to the consumers, so we tend to offload these “reactive” treatments to background jobs triggered by the consumers, so that they won’t cause errors or slow down the more critical replication mission of the consumer.
  • Ad-hoc semantic Domain Events, distinct from the replication messages, propagated via dedicated Kafka topics or by some other means like synchronous API calls. These are very similar to Event Sourcing events, except they’re not the Source of Truth and can be reserved to specific use-cases where they add value. Our use-cases for them include analytics tracking events, anonymization, CRM integration…

Summary

In summary, when we need to expose data across services, the Source of Truth service will produce a versioned Protobuf message containing the full new state of the object into a Kafka topic for each change of the object.

The message will include the following fields:

  • id to identify records within a stream, and find out which replication events apply to which object
  • seq, incremented at each update on a given object, to store the version of the object and ensure we don’t overwrite newer versions with older versions
  • delete_time to convey that an object has been deleted
  • all the fields of the object that we may want to expose, i.e. the latest full object state

Consumer services can subscribe to the topic to upsert the objects in their own local database, maintaining their own local copy of the data.

The consumer code is kept fast, simple and focused on replication, and any follow-up reactive processing is offloaded to background jobs or cron tasks.

--

--