Domains, Commands, and State Machines Oh My!
Lately I’ve been graced by the algorithms that be with a bombardment of stories on Command, Query Responsibility Segregation (CQRS). While it’s a great pattern in the realm of distributed systems and scaling systems, I find most articles out there miss the point, the explanations are minimum at best, or there’s a lack of “making it real world” in the sense that it’s hard to draw how one might use it at work from an author’s conclusions. Equally, we see a lot of people associate Event Sourcing (ES) with CQRS for a dynamic duo — yet people gloss over or don’t properly address the downsides of combining the two (nor offer any mitigations).
My hope, in the following, is to present a hybrid architecture within this article that combines various pieces of Domain Driven Design (DDD), CQRS, ES, State Machines, and Hexagonal Architecture. I’ll try to fully layout my organization, my intuition about the pros/cons of such a system, offer potential mitigations to the downside of such a system, as well as try to provide a real-world example (both in code and explanation). This article will be structure in the following way:
- Glossary: An Introduction of Terms
- Rationalization of a Hexagonal Architecture
- Command Side
- Query Side
- A real-world example in code
- Closing Remarks
Glossary — An Introduction Of Terms
I try to define, in mostly layman’s language, what the following terms you’ll see mean:
Bounded Context — A specific domain area, which may span one or more entities (or aggregates). For example, a financial system may have the concept of a “Credit Card” bounded context which encompasses the “Credit Card” entity itself in addition to a “Transaction” entity which encompasses Credits/Debits. These tend to be great microservice/application boundaries, however they may refer to other bounded contexts (e.g. in case of Credit Cards, a customer profile in the financial system that may be shared with a Banking bounded context).
Domain — When we speak of the domain, we’re talking about the “core” of our systems — technology independent data models (entities) that encompass the business logic that drives our system. I like to encompass those business rules as state machines.
Entity — The core of system, modeling a singular concept (e.g. Transaction) which contains no logic with respect to Serialize/Deserialize nor how to talk to other parts of the system (e.g Database, Message Bus, etc.)
Aggregate — I’m going to slightly deviate from the classical Domain Driven Design definition and argue that an aggregate is an entity that is composed from a series of events in the system. Traditionally, an aggregate would refer to a collection of entities (e.g A Credit Card aggregate would include a vector of Transactions). However, I tend to find this muddies the waters and doesn’t represent how I would want to compose an API from these pieces (e.g. I’d make separate calls, respective of technologies, from seeing Credit Card information and Transactions).
Ports — Interfaces which represent external dependencies our application has on the system. There are two variants: inbound and outbound. An inbound port is used to transform input coming into our application — from the user, a message bus, or another microservice — into something our domain can understand. An outbound port
Adapter — A concrete implementation of a port. For example, for an outbound port called “TransactionRepository” we may have a concrete implementation called “PostgresTransactionRepository”
Command — Within the system, an object which describes the change to be made to an entity. For example, in the aforementioned financial system, we may have a Command called “MakePayment” to make a payment on our Credit Card balance.
Query — Within the system an object which describe a query for a Query Model (entity).
Event — The result of the domain processing a command. For example, a Command called “MakePayment” may result in a event called “PaymentMade” whose payload describes how to change the previous balance of a CreditCard.
State Machine — A collection of functions that map a Command to an Event while providing business rules and validations.
Service — An object which maps an inbound port to an outbound port. For example, a service called “HTTPTransactionService” would map an HTTP input to the domain and then would output the result event to a SQL database.
Query Model — A data model (entity) which is optimized specifically for querying. This may not include all fields of the Command side aggregate.
Domain Transfer Object— I use this interchangeably with Domain Access Object. A type of entity which is solely used to serialize data from outside the system and then transform, via Into
traits/interfaces, into an entity. This will tend to be serializing from the database, such as a SQLCreditCard, and transforming to an entity in our core, CreditCard. Similarly, when receiving a request from something such as an HTTP Input, we may have an RESTCreditCard which transforms to a CreditCard.
Upcaster — An upcaster is a function which can take older versions of events and transform/“upcast” them to a newer version of the event. This allows us to evolve events while keeping backwards compatibility.
Application — A piece of software in the system which encompasses a bounded context and one or more service(s) related to that context.
Rationalization of a Hexagonal Architecture
There are plenty of better authors than me to discuss the gory, technical details of Hexagonal Architecture:
- https://netflixtechblog.com/ready-for-changes-with-hexagonal-architecture-b315ec967749
- https://medium.com/ssense-tech/hexagonal-architecture-there-are-always-two-sides-to-every-story-bc0780ed7d9c
- https://alistair.cockburn.us/hexagonal-architecture/
My rationalization herein will be for what I tend to value in software and why hexagonal architecture alines. When I write software, I’m often looking for a combination of the following (their order changes based on the needs of the system):
- Performance
- Correctness
- Evolvability
- Extension
I tend to highly value correctness as a mathematician, but I also strongly believe in the longevity of a system. Hexagonal architecture allows us to structure our system such that the domain — our business rules and logic — are completely separate from the infrastructure. This means that as we grow, scale, and the nature of computing changes — those infrastructure changes are limited to a part of a code base that doesn’t change how we “do business”. This falls in line with what I think in the second billion dollar mistake of software changes after nullability — Rewriting the system. I see most startups hyper-scale at a deafening pace only to be met by an exorbitant effort/cost to rewrite the system when a wall is hit. I’ve equally heard this phenomena amongst colleagues at large financial institutions, government positions, and many other industries. Further than that, I believe in being a good steward of software and leaving the codebase better than you found it. For reasons of longevity — so the codebase may last as long as possible and avoid the potentially costly rewrite — as well as to hand something off to the person who comes after me.
Why Event-Sourced CQRS?
I find in distributed system, you want to ensure that you have an ability to recover quickly and scale each piece independently. Event sourcing gives us an immutable log of events — meaning that we can read over our logs and reconstitute the systems state at any point in time. This is an incredibly important property when it comes to testing the system and assuring bug fixes, as well as guarding against “one in a lifetime” system failures.
Similarly, CQRS allows us to split the 2 most primitives sides of an application — writing and reading data. There are times in a large system where we may want to scale writes or reads during peaks hours, but not both. Or we may have a time where we need to scale both. Or we just need to permanently scale one side. CQRS gives us the flexibility to accommodate any of these scenarios when it comes to independent scaling.
Command Side
For the write side of our application, I tend to look at it in 3 major pieces:
- Service — How Commands are being input into our application
- Core — How are we ingesting these commands, re-constituting the aggregate from events, executing business logic, and determining which event to emit
- Adapters — How are we persisting our data? How are we sending messages to a message broker in the system? How are we guaranteeing data integrity.
For the first piece, Service, we’re really just transforming input from some method and making sure we can re-constitute an aggregate. For example, an HTTP-based service would transform a JSON payload into a Command and then ask the database to load the latest state of the aggregate (entity) by reading all of the previous events marked for that entity. The command would then be passed to aggregate, which would construct a local state machine containing business rules, validation, and logic and execute the Command against it. The output of this state machine is an Event, which marks how the entity has changed.
The service would then receive the resulting and proceed to store it into both the events and outbox tables. To finalize, the service would take the in-memory aggregate we previously constructed and apply the latest event to it before returning the aggregate to the caller as a JSON payload.
The second piece here, our Core or domain, is simply constructed from a previous record of events. As an optimization, we may choose to create a snapshot of events which tells us the last event that went into the snapshot. Then when re-constructing the aggregate (entity) we load the latest snapshot and events since the snapshot was created. As an side as well, we may provide Event Upcasters when we evolve the schema of events such that old stored events are upcasted to the new type of event via a transformation before being applied for reconstitution. This re-constituted aggregate then is handed the Command and will execute the command against a state machine. The state machine takes in the context (the current entity) and the Command to determine the correct Event to return. The state machine, since a state machine can only be a single event and responds differently (transition wise) based on which state a command is executed in helps guards us against faulty logic or missed edge cases.
Now, for the third piece here, you’ll notice I said the Service stores to both the events and outbox table, but doesn’t send out on the message bus. This insert occurs within a transaction — to avoid inconsistency of state where the aggregate’s state is effectively updated next call by storing in the event table, but the event never goes to the outbox table to get sent out. This also guards against the opposite of sending out the event, but not persisting the state. By writing to both of these tables in a single transaction, we ensure we’re able to queue and event for sending as well as persisting state. In another thread — whether that’s a true thread or a green thread that’s multiplexed onto a thread — we poll the outbox table occasionally and run a transaction which will send the event to the Event Bus and only remove it from the database if that’s successful.
Now, there’s a few failures scenarios to identify here and their severity:
- The thread reading the outbox table sends the event to the Event Bus, but dies before removing from the table. In this case, we may have a duplicate event sent on the event bus. Two ways we can mitigate this — we can either assume that we will always receive messages at least once, but not exactly once and program accordingly. Equally, we may be able to mitigate this by having our message broker de-duplicate events within a window of time — both Kafka and NATS support this for example
- We’ve received user input, but we die before persistence. We simply return an error to the user and ask them to retry their operation. No harm, no foul.
- We’ve receive the user input and we’ve stored the event, but fail to return to the user. Then the user will receive an error re-trying their command potentially by construction of the state machine, which is an ok scenario. Remember that in distributed systems, the goal isn’t to not require a user to re-perform actions that may or may not have failed, but guard the integrity of our data here (which we are).
- If we are snapshotting we fail to persist the snapshot. This is relatively benign, we re-attempt the snapshot on the next Command. It simply means we roll N+1 events into the snapshot, instead of N. Of course, persistent failure (in the 100s/1000s of events) may be problematic for a system (from a performance perspective), but should easily be monitored by logs/metrics/traces/continuous profiling.
Another thing to notice, we are able to scale writes — as discussed previously — independent here. If we notice that we are receiving a large number of CreditCard Transactions or CreatePrescription Commands, we can scale this service without necessarily needing to scale the query side of this — that handles querying Transactions or querying Prescriptions.
Query Side
For the query side of our application, I tend to look at it in 2 pieces:
- Service — How Queries are executed and how Events are transformed and applied as Query Model updates
- Adapters — How are we fetching from the cache? How are we fetching and updating the database?
An important note, because the query is separately from the command side, we can choose a database — such as a relational database, that is optimized for reads on the query side and another database — such as Cassandra — which is optimized for writes on the command side.
For the first piece, similar to the Command side, we’re transforming input into a Query for our system. This may look like transforming an HTTP input into a query. This may be as simple as having the Service call the “fetchAllPrescriptions” method on the repository when received. You may wish to shove a Query data model between your input and ultimate execution, though that comes down to user preference. Equally, your Query application will be listening to events from the event bus coming from the Command side in order to update the Query Model (entity). This means that the service, upon receiving an event, will fetch the latest Query Model, update it according to an event, and persist it.
For the second piece, we simply consider how we fetch from the database and potentially a cache. This may involve choosing databases optimized for reads. The cache is an optimization to shrink the lag between when input is received on the Command side and is serviceable from the Query side.
Now, there’s a few failures scenarios to identify here and their severity:
- The Command side fails to populate something into the cache or the cache is prematurely flushed (or we run out of room in the cache). We can potentially provide a mitigation by servicing the query through querying the Query Model, looking for any events for the model in the Event Bus, and quickly applying them and returning to the User. This may cause (without ACK-ing) the event to be re-delivered so we persist (and potentially there’s a story of updating the cache here too), but there’s plenty of ways to provide “fallbacks” to the cache failing. Equally, we may just decide — if business allows for it — to service a stale read knowing the data will be updated at a later time. Potentially the staleness of a read is mitigated by sending updates to clients, either via Webhooks or some other messaging interface.
- We receive an event and fail to persist the update to the model. Since we aren’t ACK-ing the message until persistence occurs, this may mean that we just attempt to process that event multiple times (lowering efficient use of resources, but ensuring data integrity)
- A user sends in a Query and we fail to respond. No harm, no foul. We tell the user to re-attempt.
A real-world example in code
This is still a work in progress (WIP) and coming very soon. Please check back and I’ll provide a public GitHub link when I have a chance. I plan to provide an example for a e-Prescribing service.
EDIT: Rust Command Side is here. Query side, tests, and other quality of life improvements pending: https://github.com/cawfeecoder/cqrs
EDIT 2: Typescript Command Side is here. Query side, tests, and other quality of life improvements pending: https://github.com/cawfeecoder/cqrs-ts
Closing Remarks
You’ve seen some insight from this article in how I like to design and organize around large-scale, fault tolerant systems. However, I’d also like to point out some of the following:
- While the Adapters attempt to make this technologically independent from the infrastructure, your choice of infrastructure may influence how you decide to code in terms of handling failure scenarios
- In systems, such as Lambda/serverless, you may find yourself breaking each side into many smaller, coherent pieces that are event based and depending more on your infrastructure
- Cache strategy is an article (and really deep area of study) all on it’s own. Me mentioning a cache within this article glosses over all of the considerations of providing an effective cache
- This is an architecture that has worked successful for me and provided me a ton of flexibility in terms of meeting new business requirements and evolving as the organizations needs change (in terms of scale, cost, and performance). Your mileage may vary based on your use case. This requires a larger upfront cost for potentially longevity and a better developer/on-boarding experience longer term.
- The Outbox pattern may be exchanged for a Change Data Capture pattern
- The architecture above squarely relies on the Saga pattern of providing compensating actions for rollback. Architecture may change if you desire 2-Phase Commit (2-PC) or Try, Commit, Cancel (TCC)
Future Work
- Talk about testing strategy within such an architect (Hint: It tends to rely more on unit tests)
- Talk about deployment strategy in such a system