Axon 101 — Handling Client-Side Correlation IDs

Frank Scheffler
Digital Frontiers — Das Blog
10 min readJan 18, 2023
Photo by Edge2Edge Media on Unsplash

Ever wondered, when a CQRS/ES application finally applied all the events originating from a client-side command to its read model(s)? Clients often need to know, when querying the results. In this “Axon 101” blog post you are about to learn, how to leverage client-side correlation ids to solve this, when building CQRS/ES applications using the Axon Framework.

Motivation

By its very nature, CQRS/ES based applications maintain separate write and read models. The write model is in charge of helping the command layer decide, whether an incoming command is valid or not, before any new events are applied and stored within the event store. The read model(s) are updated based on these events within the so-called event handlers, projecting the information into the most suitable format, so that it can be queried by clients.

Command Query Responsibility Segregation (CQRS) & Event Sourcing

As shown in the picture above, this process is usually asynchronous. Due to this read models will not be updated synchronously together with the events applied by the command side. Accordingly, clients sending commands to such an application need to be aware of the eventual consistency implied, when querying the read model(s).

Hence, it is often necessary for them to know, when the events resulting from the commands they’ve triggered are completely represented in the read model. In fact, they need to know how long they must wait, before the effects (events) of their command are fully applied to the read model, on which they rely.

One suitable approach to handle such scenarios is to attach client-side correlation ids to any command issued by the client, which then are reflected within the read model, so clients know, when the command is actually “finished”.

The following sections explain in more detail, how Axon can be leveraged both to preserve such correlation ids within the events and how to tell the client that all the necessary events have been applied.

Axon message correlation

Axon provides out-of-the-box support for message correlation, as described in https://docs.axoniq.io/reference-guide/axon-framework/messaging-concepts/message-correlation. Message meta-data is the key to preserving information, such as correlation ids, along the message “chain”. This typically comprises the initial command message, that actually triggered the command, as well as any new events applied by the command handler(s) involved, e.g. using AggregateLifecycle.apply().

Message correlation may even extend beyond the events applied by the initial command handler(s). For instance, event handlers or sagas may trigger new commands or publish more events themselves. Tracing such message handling chains is possible either using Axon’s traceId or standardized tracing headers, e.g. using Sleuth or OpenTelemetry. The latter ones allow for distributed tracing, since trace ids will be transferred to and picked up from different processes, as well.

Axon’s CorrelationDataProvider is responsible for “transporting” selected message meta-data to down-stream messages created within the same unit of work. This is then persisted as part of the event’s meta-data and can be used within handlers or sagas, if needed. By default, Axon itself applies the aforementioned traceId starting from the originating command handler, i.e. the one triggered by the client. Another correlationId is maintained, which identifies the parent message. Hence it may change along the message handling chain, while the traceId remains constant.

It is worth mentioning that Axon’s correlationId, described in https://docs.axoniq.io/reference-guide/axon-framework/monitoring/message-tracking, has different semantics than the client-side correlation id that this blog post addresses.

Additional meta-data keys may be registered as correlation data using the SimpleCorrelationDataProvider. Axon will then take care of those attributes automatically, if present in the meta-data, by copying their values to down-stream messages.

Capturing client-side correlation ids

Using Axon’s message correlation support it is possible to capture client-side correlation ids, as well. This may be useful in scenarios, where:

  • the client wants to control whether it requires correlation id tracking or not
  • the client needs to know, when all events resulting from a command are fully reflected in the read model it refers to
  • there are multiple events resulting from a single command, resulting in multiple updates to the read model

For the sake of simplicity, let’s start with capturing a client-side id (called operationId) and attaching it to every event published as part of a command’s unit of work using a very simple REST API.

The so-called unit of work is Axon’s abstraction for the underlying “transaction”. Accordingly, each command or event handler triggers the creation of a new unit of work. All events applied as part of it, typically via AggregateLifecycle.apply() or the EventGateway, are published atomically.

The following Kotlin code snippet shows an Order API endpoint implemented as Spring REST controller, which transforms and delegates an inbound POST request into an Axon command:

data class Order(
val productId: UUID,
val quantity: Int,
)

data class PlaceOrderCommand(
val orderId: UUID = UUID.randomUUID(),
val order: Order,
)

@RestController
class MyController(
private val commandGateway: CommandGateway,
) {

@PostMapping("/order")
fun placeOrder(
@RequestBody order: Order,
@RequestHeader(name = "X-Operation-ID", required = false) operationId: String?,
) =
commandGateway.send<Any?>(
PlaceOrderCommand(order = order)
.let { GenericCommandMessage.asCommandMessage<PlaceOrderCommand>(it) }
.andMetaData(
buildMap {
if (operationId != null) {
put("operationId", operationId)
}
}
)
)
}

As can be seen, the command is sent as CommandMessage (in favor of a raw object) annotated with additional meta-data, which in turn is extracted from the optional X-Operation-ID HTTP header supplied by the client. The meta-data key used is operationId and it is only present, if supplied by the client at all, thus avoiding null values.

Notice that while there is a withMetaData method present on the GenericCommandMessage we use andMetaData, which merges the supplied meta-data map with any existing meta-data.

In addition to attaching the operationId to any command as meta-data, Axon needs to know, that this meta-data key needs to be propagated as correlation data. The following Spring configuration shows, how this can be achieved:

@Configuration
class AxonCorrelationConfiguration {

@Bean
fun axonCorrelationDataProvider() =
SimpleCorrelationDataProvider("operationId")
}

The axonCorrelationDataProvider bean defines a SimpleCorrelationDataProvider with all the meta-data keys that need to be propagated to down-stream messages, e.g. events published by command handlers.

Finally the newly introduced meta-data key can be consumed by event handlers, as shown in the following code snippet:

@Component
class MyEventHandler {

@EventHandler
fun on(
event: OrderPlacedEvent,
@MetaDataValue("operationId") operationId: String?,
) {
// event handling logic
}
}

With this mechanism in place an event handler updating its read model could easily persist the operationId, if present, so the client actually knows when the “operation” is finished.

Most likely such correlation ids will be persisted in a “central” location within the read model, such as a dedicated data collection or table independent from any domain data. Moreover, since these ids will be silently consumed (polled) by the client, it should be assured that there is a clean-up procedure in place, which removes them after a certain period of time.

However, in case multiple events were published as part of the same unit of work, persisting the operationId should be postponed to the very last of those events. This can either be achieved by the event handler “knowing” which is the final event or introducing a dedicated additional event, as shown in the next section.

Dedicated correlation id events

Projecting a read model including client-side correlation ids, such as an operationId, involves the challenge of deciding, which of potentially multiple events involved in a single “operation” is the final one. Generally, an Axon event handler can never know exactly, how many events have been published as part of the same unit of work, thus which is the last of them.

For example, think of an aggregate publishing events for dynamic list of sub entities, being updated by a command. This set of entities may grow and shrink at any time and so may the number of events being published as part of the same unit of work.

It is therefore preferable to use a dedicated event that demarcates the end of the unit of work — that is the operation. Remember, that the event handler does not know, which events were applied as part of the same unit of work — unless its hard-coded “knowledge”. Accordingly, such an event handler tells the event handler, when to persist the operationId to its read model. In addition to that, if the client did not send an operationId at all, such event simply won’t be published. Thus the event clearly marks the end of any operation (aka unit of work) annotated with client-side operation ids within the global event stream. Moreover, such an event could easily carry other information, such as the overall execution time of the unit of work, without impact on the preceding (domain) events. This is a much clearer separation of concerns.

In order to publish such an additional event, Axon provides support for so-called MessageDispatcherInterceptors, which may intercept and modify messages — in our case previously published events. They are executed within the prepareCommit phase of the unit of work. Such interceptors have access to the complete list of messages published within the same unit of work. Moreover, they aren’t limited to only modifying those messages, but may as well publish additional events.

The following code snippet shows an implementation of a MessageDispatchInterceptor intercepting a list of EventMessages published within the same unit of work.

data class OperationFinishedEvent(
val aggregateId: String,
val operationId: String,
)

class ClientOperationEventDispatchInterceptor(val eventGateway: EventGateway) : MessageDispatchInterceptor<EventMessage<*>> {

override fun handle(messages: List<EventMessage<*>>) =
BiFunction<Int, EventMessage<*>, EventMessage<*>> { idx, message ->

CurrentUnitOfWork.ifStarted { uow ->
val operationId = uow.correlationData["operationId"] as? String
if (operationId != null) {
if (message is DomainEventMessage) {
val uowAggregateId = uow.getOrComputeResource("operationAggregateId") { message.aggregateIdentifier }
check(message.aggregateIdentifier == uowAggregateId) { "MessageDispatchInterceptor does not support multiple aggregate instances" }
}

if (idx == messages.size - 1) {
val operationAggregateId = uow.getResource<String>("operationAggregateId")
if (operationAggregateId != null) {
eventGateway.publish(
OperationFinishedEvent(
aggregateId = operationAggregateId,
operationId = operationId,
)
)
}
}
}
}

message
}
}

The interceptor returns a BiFunction that may transform each of the EventMessages. In our case they are returned as-is. Instead the interceptor extracts the operationId from the correlation data and the aggregate identifier from the first DomainEventMessage it encounters and ensures, only one aggregate instance is involved within the same unit of work (more details on this in the next section of this post). The aggregate id is stored as an additional resource within the unit of work, so it is available, when the BiFunction is called for the final event. When this happens, it publishes a dedicated event, that demarcates the end of the “operation” for this aggregate instance, if necessary.

The message interceptor needs to be registered with Axon’s event bus, e.g. via the following Spring configuration:

@Configuration
class AxonClientOperationDispatchConfiguration {

@Autowired
fun configureClientOperationDispatchInterceptor(eventBus: EventBus, eventGateway: EventGateway) {
eventBus.registerDispatchInterceptor(ClientOperationEventDispatchInterceptor(eventGateway))
}
}

Sequential event handling

The newly introduced OperationFinishedEvent can be consumed by event handling methods within any Spring beans, in order to update the read model for the client, e.g. as follows:

@EventHandler
fun on(event: OperationFinishedEvent) {
// ... project operation into the read model ...
}

The event would typically be consumed by dedicated event handlers, that need to know when a unit of work is finished, in order to persist that knowledge to their read model. However, the attentive reader will already have noticed, that OperationFinishedEvents are being published as generic events — for three reasons mainly:

  1. The MessageDispatchInterceptor simply has no access to any aggregate life-cycle and therefore cannot apply() additional domain events.
  2. It is the very nature of the OperationFinishedEvent that it addresses a technical cross-cutting concern that should not be modeled as domain event at all.
  3. If the information were part of the domain events themselves, the aggregate would be responsible for “knowing”, when to apply that information or not, as compared to the generic MessageDispatchInterceptor approach.

Hence, any set of event handlers (and their event handling methods) belonging to the same Axon processing group must assure correct ordering, when relying on this generic event. While the dispatch interceptor assured that the event was published at the end of the unit of work, after all preceding events, event handlers need to make sure those events are consumed in-order, especially when handling events in parallel segments (see https://docs.axoniq.io/reference-guide/axon-framework/events/event-processors/streaming#sequential-processing).

Axon SequencingPolicy implementations are responsible for determining a suitable sequence identifier, which is then deterministically mapped to one of the processing group’s segments. All events mapped to the same sequence identifier are then handled in-order with respect to their global sequence number. The default SequentialPerAggregatePolicy does that based on the aggregate identifier for domain events and falls back to the event identifier for generic events. This ensures that all domain events belonging to the same aggregate instance are handled in order within the same segment, while generic events are evenly distributed among all segments with no specific order.

Thus, we need to bring the OperationFinishedEvent in order with the domain events from the aggregate instance, which published the domain events for that operation. That’s why we actually stored the aggregateId together with the OperationFinishedEvent and made sure, the same unit of work does not span multiple aggregate instances.

While it’s possible from a technical perspective, it is considered bad practice to span a single unit of work across multiple aggregate instances, as this more or less represents a “distributed transaction”. An aggregate is assumed to represent a “transactional boundary”, thus each aggregate should be addressed within its own unit of work.

Axon already provides suitable SequencingPolicy implementations, which can be used as Spring beans, as shown in the following code snippet:

@Configuration
class AxonSequencingConfiguration {

@Bean
fun operationSequencingPolicy(): SequencingPolicy<EventMessage<*>> =
PropertySequencingPolicy
.builder(OperationFinishedEvent::class.java, String::class.java)
.propertyName(OperationFinishedEvent::aggregateId.name)
.fallbackSequencingPolicy(SequentialPerAggregatePolicy())
.build()
}

The policy shown here checks if an event is of type OperationFinishedEvent first. If so, it extracts the contained aggregateId property as sequence identifier. If not, it simply falls back to the default SequentialPerAggregatePolicy.

Since we will most likely use different policies for different processing groups, the policy beans needs to be assigned to the correct processing group, thus overriding the default SequentialPerAggregatePolicy. This can be achieved using Spring YAML configuration, e.g. as follows:

axon:
eventhandling:
processors:
my-processing-group-name:
mode: tracking
sequencing-policy: operationSequencingPolicy

Summary

In this blog post we’ve developed an approach that helps us in tracking client-side correlation ids starting from the command layer throughout the event handlers. A dedicated generic event was used to demarcate such operations, thus relieving the domain events from this cross-cutting concern. Finally, the proper ordering of this generic event was assured using Axon sequencing.

If you find this blog post helpful or have any suggestions on how to improve the approach shown, feel free to respond or contact me on Twitter.

--

--