CodeX
Published in

CodeX

Domains, Commands, and State Machines Oh My!

Lately I’ve been graced by the algorithms that be with a bombardment of stories on Command, Query Responsibility Segregation (CQRS). While it’s a great pattern in the realm of distributed systems and scaling systems, I find most articles out there miss the point, the explanations are minimum at best, or there’s a lack of “making it real world” in the sense that it’s hard to draw how one might use it at work from an author’s conclusions. Equally, we see a lot of people associate Event Sourcing (ES) with CQRS for a dynamic duo — yet people gloss over or don’t properly address the downsides of combining the two (nor offer any mitigations).

My hope, in the following, is to present a hybrid architecture within this article that combines various pieces of Domain Driven Design (DDD), CQRS, ES, State Machines, and Hexagonal Architecture. I’ll try to fully layout my organization, my intuition about the pros/cons of such a system, offer potential mitigations to the downside of such a system, as well as try to provide a real-world example (both in code and explanation). This article will be structure in the following way:

  • Glossary: An Introduction of Terms

Glossary — An Introduction Of Terms

I try to define, in mostly layman’s language, what the following terms you’ll see mean:

Bounded Context — A specific domain area, which may span one or more entities (or aggregates). For example, a financial system may have the concept of a “Credit Card” bounded context which encompasses the “Credit Card” entity itself in addition to a “Transaction” entity which encompasses Credits/Debits. These tend to be great microservice/application boundaries, however they may refer to other bounded contexts (e.g. in case of Credit Cards, a customer profile in the financial system that may be shared with a Banking bounded context).

Domain — When we speak of the domain, we’re talking about the “core” of our systems — technology independent data models (entities) that encompass the business logic that drives our system. I like to encompass those business rules as state machines.

Entity — The core of system, modeling a singular concept (e.g. Transaction) which contains no logic with respect to Serialize/Deserialize nor how to talk to other parts of the system (e.g Database, Message Bus, etc.)

Aggregate — I’m going to slightly deviate from the classical Domain Driven Design definition and argue that an aggregate is an entity that is composed from a series of events in the system. Traditionally, an aggregate would refer to a collection of entities (e.g A Credit Card aggregate would include a vector of Transactions). However, I tend to find this muddies the waters and doesn’t represent how I would want to compose an API from these pieces (e.g. I’d make separate calls, respective of technologies, from seeing Credit Card information and Transactions).

Ports — Interfaces which represent external dependencies our application has on the system. There are two variants: inbound and outbound. An inbound port is used to transform input coming into our application — from the user, a message bus, or another microservice — into something our domain can understand. An outbound port

Adapter — A concrete implementation of a port. For example, for an outbound port called “TransactionRepository” we may have a concrete implementation called “PostgresTransactionRepository”

Command — Within the system, an object which describes the change to be made to an entity. For example, in the aforementioned financial system, we may have a Command called “MakePayment” to make a payment on our Credit Card balance.

Query — Within the system an object which describe a query for a Query Model (entity).

Event — The result of the domain processing a command. For example, a Command called “MakePayment” may result in a event called “PaymentMade” whose payload describes how to change the previous balance of a CreditCard.

State Machine — A collection of functions that map a Command to an Event while providing business rules and validations.

Service — An object which maps an inbound port to an outbound port. For example, a service called “HTTPTransactionService” would map an HTTP input to the domain and then would output the result event to a SQL database.

Query Model — A data model (entity) which is optimized specifically for querying. This may not include all fields of the Command side aggregate.

Domain Transfer Object— I use this interchangeably with Domain Access Object. A type of entity which is solely used to serialize data from outside the system and then transform, via Into traits/interfaces, into an entity. This will tend to be serializing from the database, such as a SQLCreditCard, and transforming to an entity in our core, CreditCard. Similarly, when receiving a request from something such as an HTTP Input, we may have an RESTCreditCard which transforms to a CreditCard.

Upcaster — An upcaster is a function which can take older versions of events and transform/“upcast” them to a newer version of the event. This allows us to evolve events while keeping backwards compatibility.

Application — A piece of software in the system which encompasses a bounded context and one or more service(s) related to that context.

Rationalization of a Hexagonal Architecture

There are plenty of better authors than me to discuss the gory, technical details of Hexagonal Architecture:

My rationalization herein will be for what I tend to value in software and why hexagonal architecture alines. When I write software, I’m often looking for a combination of the following (their order changes based on the needs of the system):

  • Performance

I tend to highly value correctness as a mathematician, but I also strongly believe in the longevity of a system. Hexagonal architecture allows us to structure our system such that the domain — our business rules and logic — are completely separate from the infrastructure. This means that as we grow, scale, and the nature of computing changes — those infrastructure changes are limited to a part of a code base that doesn’t change how we “do business”. This falls in line with what I think in the second billion dollar mistake of software changes after nullability — Rewriting the system. I see most startups hyper-scale at a deafening pace only to be met by an exorbitant effort/cost to rewrite the system when a wall is hit. I’ve equally heard this phenomena amongst colleagues at large financial institutions, government positions, and many other industries. Further than that, I believe in being a good steward of software and leaving the codebase better than you found it. For reasons of longevity — so the codebase may last as long as possible and avoid the potentially costly rewrite — as well as to hand something off to the person who comes after me.

Why Event-Sourced CQRS?

I find in distributed system, you want to ensure that you have an ability to recover quickly and scale each piece independently. Event sourcing gives us an immutable log of events — meaning that we can read over our logs and reconstitute the systems state at any point in time. This is an incredibly important property when it comes to testing the system and assuring bug fixes, as well as guarding against “one in a lifetime” system failures.

Similarly, CQRS allows us to split the 2 most primitives sides of an application — writing and reading data. There are times in a large system where we may want to scale writes or reads during peaks hours, but not both. Or we may have a time where we need to scale both. Or we just need to permanently scale one side. CQRS gives us the flexibility to accommodate any of these scenarios when it comes to independent scaling.

Command Side

For the write side of our application, I tend to look at it in 3 major pieces:

  • Service — How Commands are being input into our application

For the first piece, Service, we’re really just transforming input from some method and making sure we can re-constitute an aggregate. For example, an HTTP-based service would transform a JSON payload into a Command and then ask the database to load the latest state of the aggregate (entity) by reading all of the previous events marked for that entity. The command would then be passed to aggregate, which would construct a local state machine containing business rules, validation, and logic and execute the Command against it. The output of this state machine is an Event, which marks how the entity has changed.

The service would then receive the resulting and proceed to store it into both the events and outbox tables. To finalize, the service would take the in-memory aggregate we previously constructed and apply the latest event to it before returning the aggregate to the caller as a JSON payload.

The second piece here, our Core or domain, is simply constructed from a previous record of events. As an optimization, we may choose to create a snapshot of events which tells us the last event that went into the snapshot. Then when re-constructing the aggregate (entity) we load the latest snapshot and events since the snapshot was created. As an side as well, we may provide Event Upcasters when we evolve the schema of events such that old stored events are upcasted to the new type of event via a transformation before being applied for reconstitution. This re-constituted aggregate then is handed the Command and will execute the command against a state machine. The state machine takes in the context (the current entity) and the Command to determine the correct Event to return. The state machine, since a state machine can only be a single event and responds differently (transition wise) based on which state a command is executed in helps guards us against faulty logic or missed edge cases.

Now, for the third piece here, you’ll notice I said the Service stores to both the events and outbox table, but doesn’t send out on the message bus. This insert occurs within a transaction — to avoid inconsistency of state where the aggregate’s state is effectively updated next call by storing in the event table, but the event never goes to the outbox table to get sent out. This also guards against the opposite of sending out the event, but not persisting the state. By writing to both of these tables in a single transaction, we ensure we’re able to queue and event for sending as well as persisting state. In another thread — whether that’s a true thread or a green thread that’s multiplexed onto a thread — we poll the outbox table occasionally and run a transaction which will send the event to the Event Bus and only remove it from the database if that’s successful.

Now, there’s a few failures scenarios to identify here and their severity:

  • The thread reading the outbox table sends the event to the Event Bus, but dies before removing from the table. In this case, we may have a duplicate event sent on the event bus. Two ways we can mitigate this — we can either assume that we will always receive messages at least once, but not exactly once and program accordingly. Equally, we may be able to mitigate this by having our message broker de-duplicate events within a window of time — both Kafka and NATS support this for example

Another thing to notice, we are able to scale writes — as discussed previously — independent here. If we notice that we are receiving a large number of CreditCard Transactions or CreatePrescription Commands, we can scale this service without necessarily needing to scale the query side of this — that handles querying Transactions or querying Prescriptions.

Query Side

For the query side of our application, I tend to look at it in 2 pieces:

  • Service — How Queries are executed and how Events are transformed and applied as Query Model updates

An important note, because the query is separately from the command side, we can choose a database — such as a relational database, that is optimized for reads on the query side and another database — such as Cassandra — which is optimized for writes on the command side.

For the first piece, similar to the Command side, we’re transforming input into a Query for our system. This may look like transforming an HTTP input into a query. This may be as simple as having the Service call the “fetchAllPrescriptions” method on the repository when received. You may wish to shove a Query data model between your input and ultimate execution, though that comes down to user preference. Equally, your Query application will be listening to events from the event bus coming from the Command side in order to update the Query Model (entity). This means that the service, upon receiving an event, will fetch the latest Query Model, update it according to an event, and persist it.

For the second piece, we simply consider how we fetch from the database and potentially a cache. This may involve choosing databases optimized for reads. The cache is an optimization to shrink the lag between when input is received on the Command side and is serviceable from the Query side.

Now, there’s a few failures scenarios to identify here and their severity:

  • The Command side fails to populate something into the cache or the cache is prematurely flushed (or we run out of room in the cache). We can potentially provide a mitigation by servicing the query through querying the Query Model, looking for any events for the model in the Event Bus, and quickly applying them and returning to the User. This may cause (without ACK-ing) the event to be re-delivered so we persist (and potentially there’s a story of updating the cache here too), but there’s plenty of ways to provide “fallbacks” to the cache failing. Equally, we may just decide — if business allows for it — to service a stale read knowing the data will be updated at a later time. Potentially the staleness of a read is mitigated by sending updates to clients, either via Webhooks or some other messaging interface.

A real-world example in code

This is still a work in progress (WIP) and coming very soon. Please check back and I’ll provide a public GitHub link when I have a chance. I plan to provide an example for a e-Prescribing service.

EDIT: Rust Command Side is here. Query side, tests, and other quality of life improvements pending: https://github.com/cawfeecoder/cqrs

EDIT 2: Typescript Command Side is here. Query side, tests, and other quality of life improvements pending: https://github.com/cawfeecoder/cqrs-ts

Closing Remarks

You’ve seen some insight from this article in how I like to design and organize around large-scale, fault tolerant systems. However, I’d also like to point out some of the following:

  • While the Adapters attempt to make this technologically independent from the infrastructure, your choice of infrastructure may influence how you decide to code in terms of handling failure scenarios

Future Work

  • Talk about testing strategy within such an architect (Hint: It tends to rely more on unit tests)

--

--

Everything connected with Tech & Code. Follow to join our 1M+ monthly readers

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store