Communicating Data Changes Across Service Boundaries… Safely!
“Friends don’t let friends do dual writes.” — Gunnar Morling
Setting the Scene
Besides directly making modifications to their dedicated, isolated data stores, microservices might need to communicate data changes across service boundaries as well. For instance, when leveraging commands and/or events which are sent between any two services it can be beneficial to introduce a decoupling component in order not to suffer from the drawbacks of numerous direct point to point connections. In modern data architectures we oftentimes find Apache Kafka as distributed streaming platform at the heart of all the data flows. This means we need to find a way to update data stores and additionally write events as messages to Kafka topics for other services to consume. While this seems to be pretty straight forward at first sight, it turns out that it’s actually not trivial to implement this from scratch in a reliable and consistent way:
- the naive approach to perform two separate write operations — one against the service-local data store and the other to Kafka topics — obviously only works for the happy path
- the infamous two-phase commit isn’t really an option here since we cannot simply employ XA transactions spanning arbitrary data stores and message brokers such as Apache Kafka
A somewhat “underestimated” approach to achieve this in a reliable and consistent way is by means of the so-called “outbox pattern” (a.k.a transactional outbox) which is one of several well-defined patterns in the context of microservice architectures. There is a detailed, extremely informative and very well written write-up about the outbox pattern by Gunnar Morling on the Debezium blog. It’s a highly recommended read in case you want to get some more background and a deeper investigation on this topic. Furthermore, they also provide a turn-key ready reference implementation using a Java EE technology stack.
Based on a slightly simplified but otherwise strikingly similar example, this blog post discusses a POC implementation using a different technology stack for a demo microservice:
- Spring Boot and application events
- Spring Data with domain events
- MySQL instead of Postgres as the underlying RDBMS
The event-based communication is still built on top of Apache Kafka with Kafka Connect and Debezium.
Application Event Structure
Every event which needs to be written to the “outbox” has to exhibit certain properties. For that reason, there is common interface called Outboxable:
Database Event Structure
The order microservice’s data store is MySQL. The corresponding table structure to store any event of type Outboxable looks unsurprisingly as follows:
NOTE: Other than in the original reference implementation, the payload itself is expected to be given by ANY(!) string representation instead of explicitly tying it to e.g. JsonNode on the application layer) or JSONB on the persistence layer. On the one hand, event payloads don’t necessarily need to be encoded as JSON strings and on the other hand, not all data stores would even provide a JSONB-compatible data type… so the chosen approach is applicable in a more generic way.
Every “outboxable” event that needs to get persisted in the database is converted to an @Entity OutboxEvent which reflects the structure shown above:
There is a dedicated Spring component OutboxListener which is responsible to react to the dispatching of any “outboxable” events. It uses the OutboxEventRepository for CRUD purposes in order to presist the actual OutboxEvent entities:
The implementation is of course agnostic to the origin of an “outboxable” event, so it doesn’t matter if an event was published via the Spring Data @DomainEvents mechanism or manually triggerd via the ApplicationEventPublisher.
Firing Outboxable Events
Since the Spring Boot example uses Spring Data we can employ the @DomainEvents mechanism for the PurchaseOrder entity. Doing so, every time we call the corresponding PurchaseOrderRepository’s save(…) method, Spring makes sure to publish any custom event we need to notify about the insertion or update of one such entity. This is in fact exactly what we want to happen in the context of the outbox pattern. It can be easily achieved by following a simple convention like in the snippet below:
By using the @DomainEvents annotation Spring Data will call this method and publish any events contained in its Collection<Outboxable> return value. The code above only uses one “outboxable” OrderUpsertedEvent reflecting the current state of the entity itself:
This demo application employs Jackson and serializes the event payload structure as JSON string, but in general any string serialization would do, for instance, leveraging Base64 to support the encoding of binary data. The name OrderUpsertedEvent is used on purpose here since this event type will actually be published under the following two conditions: a) every time a new purchase order entity is inserted into the underlying outbox_event table and b) every time we update an existing purchase order entity. In the @Service OrderService’s placeOrder(…) method there is no evidence of this event since it is implicitly handled by Spring Data in the background.
It’s also important to highlight that all the persistence related actions are happening within one and the same transcational scope. This guarantees ACID properties so that either both writes — insert/update of the complete aggregate (order meta data and order line details) together with the corresponding “outboxable” OrderUpsertedEvent — are consistently applied to the database or get rolled back on errors.
While Spring Data @DomainEvents are a nice way to attach the publication of such events to aggregate entities for notification purposes, they are not particularly flexible and also not so straight-forward to apply in a more fine grained way, i.e. when we would only want to consider and notify that certain parts of the aggregate have changed.
Exactly for this reason the demo also employs an alternative approach to publish “outboxable” events explicitly / manually by means of Spring’s ApplicationEventPublisher.
This example shows how to trigger a custom event payload in case any single order line of a complete order has its status changed. So right after performing the update we publish an “outboxable” OrderLineUpdatedEvent to inform about an order line status modification. Next, by explicitly calling the repository’s save(…) method with the complete aggregate, another “outboxable” OrderUpsertedEvent is again implicitly published by the @DomainEvents mechanism. This is an optional step and only performed if it’s needed to communicate every new full state of the aggregate by means of an additional outbox event on every change. Again, by annotating with @Transactional we make sure that all changes are applied in a consistent and reliable way.
Processing Outbox Events
After installing the Debezium Source Connector into your Kafka Connect environment you can POST the following configuration against Kafka Connect’s REST API to capture changes that are applied against the “outbox table” of the MySQL database of the microservice sample:
Inspecting Raw Outbox Event in Kafka Topics
When we run the Spring Boot application, two sample orders with a few order lines are created during start up. Also right after creating the order the statuses of the order lines are changed to simulate interactions with the API the service exposes. This leads to several events being written into the “outbox table” which are in turn captured by the Debezium MySQL source connector.
We can inspect the messages which are written to the configured Kafka topic
dbserver1.outbox-demo.outbox_eventeasily on the command line by running the following command:
Below are two example messages, one which reflects the insertion of the first order, followed by the corresponding deletion of the same order, the latter again done to keep the original “outbox table” from growing infinitely.
Propagating Raw Outbox Events to MongoDB
There are two main challenges when trying to stream raw outbox events to an operational data store. First, most sink connectors are not capable at all to properly process CDC events such as the ones published by Debezium since they lack the necessary semantic awareness of said events. Second, even if they can process them, it’s typically in the interest of a CDC-aware sink connector to handle all different kinds of CDC events, namely INSERTs, UPDATEs and DELETEs. However, when processing CDC events derived from on “outbox table”, special processing is needed which allows to ignore certain CDC event types. Concretely, any DELETEs — like shown in the above paragraph — must not be reflected in the sink, since this would always remove any previous insertions. Remember that this stems from the fact that the original “outbox table” is also effectively empty all the time and only used to perform transaction-aware capturing of changes from the data store’s log. The MongoDB community sink connector was recently updated with a preview feature to allow for such scenarios by means of specific configuration options. The snippet below shows a sample configuration which is able to process raw outbox events originating from a Debezium MySQL source connector:
The most important part is the last configuration entry
mongodb.change.data.capture.handler.operations.outbox-raw which can be configured with a list of CDC operation types: “c,r,u,d”. In this case we are only interested to process “c” type operations i.e. INSERTs and ignoring any of the others “r,u,d”. By definition the outbox table will never experience “u” i.e. UPDATEs but of course it will receive “d” i.e. DELETEs to clean up any events right after they were written. By only processing INSERTs, the sink connector is able to preserve all raw outbox events that were generated in the original "outbox table" of the source data store.
Running this sink connector results in the
outboxed.outbox-raw collection in MongoDB which keeps track of all raw outbox events ever created.
The full source code for the discussed example application together with the Kafka Connector configurations can be found on GitHub.
A future blog post will explain more advanced outbox-related processing options such as:
- the need for aggregate-ware splitting of outbox events
- configuring Kafka Connect single message transforms (SMTs) for custom outbox event routing which is currently incubating at Debezium
- another pending sink connector feature which will allow to continuously update a materialized view of the actual outboxed aggregate structures instead of collecting raw outbox events only
Stay tuned! Until then happy “outboxing” :)
Finally, many thanks to Gunnar Morling for giving helpful feedback on early drafts of this blog post and thereby contributing to improve it.