Navigating CQRS and Event Sourcing: My Journey with NestJS and EventStoreDB (Part 1)

Kersten Kriegbaum
Digital Frontiers — Das Blog
17 min readJan 29, 2024

--

Introduction

This article dives into using Command Query Responsibility Segregation (CQRS) and Event Sourcing (ES) with NestJS and EventStoreDB. It stems from my goal to learn new architectural patterns, moving away from traditional layered architectures. With a background in Node.js, I wanted to explore these modern techniques firsthand.

The content is based on my personal experience with CQRS, Event Sourcing, NestJS, and EventStoreDB. It covers the technical challenges I faced and how I solved them. The goal is to give a straightforward guide on implementing these patterns, using examples from a sample project.

We’ll go through the example project step by step, explaining the key functionality of each part and how it works. This approach is about sharing practical insights from someone who wanted to learn and apply something new in the realm of web application development.

Prerequisites

You can find the repo of this project here:
https://github.com/dxfrontiers/nestjs-cqrs

To follow the examples you need docker and docker-compose (comes with docker desktop) up and running.

You will have to choose the right EventStoreDB docker image matching your system.

I used the following npm packages and versions (most of the packages come with nestJS):

"dependencies": {
"@eventstore/db-client": "6.1.0",
"@nestjs/common": "10.3.0",
"@nestjs/core": "10.3.0",
"@nestjs/cqrs": "10.2.6",
"@nestjs/platform-express": "10.3.0",
"reflect-metadata": "0.1.13",
"rxjs": "7.8.1",
"uuid": "9.0.1"
},
"devDependencies": {
"@nestjs/cli": "10.3.0",
"@nestjs/schematics": "10.1.0",
"@nestjs/testing": "10.3.0",
"@types/express": "4.17.21",
"@types/jest": "29.5.11",
"@types/node": "20.11.5",
"@types/supertest": "2.0.16",
"@typescript-eslint/eslint-plugin": "6.19.0",
"@typescript-eslint/parser": "6.19.0",
"eslint": "8.56.0",
"eslint-config-prettier": "9.1.0",
"eslint-plugin-prettier": "5.1.3",
"jest": "29.7.0",
"prettier": "3.2.4",
"source-map-support": "0.5.21",
"supertest": "6.3.4",
"ts-jest": "29.1.1",
"ts-loader": "9.5.1",
"ts-node": "10.9.2",
"tsconfig-paths": "4.2.0",
"typescript": "5.3.3"
}

Foundations

NestJS

NestJS is gaining traction in the Node.js community as a platform for server-side application development. Its standout feature is Dependency Injection (DI), which simplifies component management and testing. However, applying NestJS for CQRS is relatively new in the JavaScript domain.

CQRS

CQRS stands for Command Query Responsibility Segregation. It’s a design pattern that separates the operations that modify data (commands) from the operations that read data (queries). In a CQRS system, different models to update information and read information are used. This means that the part of the system that handles writing data can be different from the part that reads data.

Event sourcing

Event Sourcing is a way of handling data changes in a system by storing a sequence of events that describe changes to the data over time. Instead of saving just the current state of the data, Event Sourcing keeps a record of all the changes that have happened. Each event represents a change in data, and the current state can be rebuilt by replaying these events.

Event store

An Event Store is a type of database designed specifically for Event Sourcing. It stores events rather than the state of data objects. These events are saved in the order they occur, which allows the system to reconstruct an object’s state by replaying the events.

Advantages of CQRS and Event sourcing

Event sourcing and CQRS provide major advantages, especially when combined. The combination of Event Sourcing and CQRS offers a key advantage: events can be used to keep the read and write models in sync. This ensures consistent data updates across both models, significantly enhancing system integrity and data processing efficiency. This full record of state changes is also important for thorough audits and the ability to replay or undo events. This is particularly useful in situations where tracking every change in the application state is necessary. Additionally, Event Sourcing allows for adding new features by rebuilding the state from past data, making sure it works with current data and improves the development of the application.

ES is highly effective for audit trails and compliance. It records every state change in the system as a series of events. This creates a detailed, unchangeable history of all transactions. ES makes it easy to track actions, ensuring compliance and simplifying audits. This is crucial for systems requiring strict oversight.

Another CQRS advantage is testing efficiency. Separating read and write operations simplifies testing and code complexity. Moreover CQRS improves the architecture’s extensibility, allowing for independent scaling and optimization of these operations.

Although these methods are not yet mainstream in JavaScript, their integration presents an intriguing opportunity for application development. NestJS’s robustness and community support provide a strong foundation. This article critically examines these methodologies, evaluating their application in real-world scenarios, focusing on the benefits of ES and CQRS with NestJS and EventStoreDB.

CQRS works well with Domain-Driven Design (DDD) because it separates how data is read and written, matching DDD’s focus on clear roles and areas in a system. This makes it easier to deal with complicated rules and grow the system over time. By putting domain events at the heart of the design, it helps keep the architecture clean and focused on the main business areas, making the system easier to understand and maintain.

Structure of the sample Project

The sample project models a storage management system for a future power grid, emphasizing managing storage units where users can register, enable, or disable them.

Key functionalities

  1. Storage Unit Registration
    The system registers new storage units with designated capacities.
  2. Enabling/Disabling Storage Units
    Users can modify the status of storage units, indicating their activity.
  3. Event-Driven Updates
    Changes are managed through events, ensuring a robust audit trail and state management.

Step by step guide

Please make sure that Node.js (version >= 16) is installed on your operating system.

Preferred way:

npx @nestjs/cli new example_project

Alternative way:

npm i -g @nestjs/cli
nest new example-project

Follow the steps. Next, let’s install all required packages.

cd example-project
npm i @nestjs/cqrs
npm i @eventstore/db-client
npm i uuid
npm i prettier

Now, let’s first setup the eventStoreDB. We will setup EventstoreDB using Docker Compose. Make sure you have docker up and running.

in the root directory, setup a file named docker-compose.yml:

version: '3.7'
services:
eventstore.db:
image: eventstore/eventstore:22.10.3-alpha-arm64v8 # For Apple M Chips
# image: eventstore/eventstore:21.6.0-buster-slim # For ARM Chips
environment:
- EVENTSTORE_CLUSTER_SIZE=1
- EVENTSTORE_RUN_PROJECTIONS=All
- EVENTSTORE_START_STANDARD_PROJECTIONS=true
- EVENTSTORE_EXT_TCP_PORT=1113
- EVENTSTORE_HTTP_PORT=2113
- EVENTSTORE_INSECURE=true
- EVENTSTORE_ENABLE_EXTERNAL_TCP=true
- EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
ports:
- '1113:1113'
- '2113:2113'
volumes:
- type: volume
source: eventstore-volume-data
target: /c/data/eventstore/data
- type: volume
source: eventstore-volume-logs
target: /c/data/eventstore/logs
volumes:
eventstore-volume-data:
eventstore-volume-logs:

Please checkout the right image for your system.
To start EventStoreDB, go to the root dir of your project and type:

docker-compose-up

Now, in the /src directory of your project, create a file, eventstore.ts:

// eventstore.ts
import {EventStoreDBClient, FORWARDS, START} from '@eventstore/db-client'

const client = EventStoreDBClient.connectionString(
'esdb://localhost:2113?tls=false',
)

const connect = () => {
try {
const result = client.readAll({
direction: FORWARDS,
fromPosition: START,
maxCount: 1,
})
} catch (error) {
console.error('Error connecting to EventStoreDB:', error)
}
}

export {client, connect}

That’s it for the eventstoreDB.

Folder structure

In line with the principle of CQRS which emphasizes the separation of concerns, we reflect this division of tasks in our project’s folder structure. This approach ensures a clear and organized representation of the different responsibilities within the application.

So let’s create 2 directories, one for our storage devices and everything that has to do with storage and one for our api to send commands and queries to the application:

/api
/storage

Commands

Let’s start by creating the commands we need. In the storage directory, create a file storage.commands.ts:

// storage.commands.ts
import {ICommand} from '@nestjs/cqrs'

export class RegisterStorageUnitCommand implements ICommand {
constructor(
public readonly aggregateId: string,
public readonly capacity: string,
) {}
}

export class DisableStorageUnitCommand implements ICommand {
constructor(public readonly aggregateId: string) {}
}

export class EnableStorageUnitCommand implements ICommand {
constructor(public readonly aggregateId: string) {}
}

In NestJS, ICommand is an interface used in the CQRS pattern. It represents a command, which is a directive or instruction to perform a specific action. Commands are used in scenarios where you want to modify the state of a system. Commands in CQRS are responsible for updating the state based on the business logic.

Key characteristics of ICommand in NestJS

  1. Intent
    It expresses a clear intention to perform an action, like creating, updating, or deleting an entity.
  2. Data Container
    It usually carries the necessary data to perform the action. This data can include identifiers, payload, or other relevant information.
  3. Handling
    Commands are handled by command handlers (to which we come later), which contain the business logic to execute the action defined in the command.
  4. One-Way Operations
    Commands are typically one-way operations that do not return a value (in contrast to queries, which fetch data).

In practice, when using CQRS with NestJS, you define commands as classes that implement the ICommand interface. These classes are then handled by corresponding command handlers, ensuring a clean separation between the command dispatching and the execution logic.

Events

Now we need to prepare our events with the required information. In the /src/storage directory, create a file storage.events.ts:

// storage.events.ts
import {UUID} from 'uuid'
import {IEvent} from "@nestjs/cqrs";

export class StorageEvent implements IEvent {
constructor(
public readonly aggregateId: UUID,
public readonly capacity: string,
public readonly isNew: boolean,
) {}
}

export class StorageRegisteredEvent extends StorageEvent {}
export class StorageDisabledEvent extends StorageEvent {}
export class StorageEnabledEvent extends StorageEvent {}

In NestJS, IEvent is an interface used within the CQRS module. It serves as a base for creating event classes.

Purpose

Events in CQRS represent things that have happened within the system. In this context, IEvent serves as a contract for classes that encapsulate the details of these occurrences.

Usage

Implementing IEvent allows the system to recognize and handle these classes as events. This is essential for Event Sourcing, where events are used to track and store state changes in the application.

By using IEvent and the CQRS pattern, your NestJS application can efficiently handle complex scenarios involving distinct command and query responsibilities, improving the maintainability and scalability of your code.

Aggregate

The storage.aggregate.ts file is crucial, embodying the core business logic within a Domain-Driven Design (DDD), employing CQRS and Event Sourcing principles in a NestJS application.

In the /src/storage directory, create a file storage.aggregate.ts:

// storage.aggregate.ts
import {
AggregateRoot,
CommandHandler,
EventPublisher,
EventsHandler,
ICommandHandler,
IEventHandler,
} from '@nestjs/cqrs'
import {
StorageDisabledEvent,
StorageEnabledEvent,
StorageRegisteredEvent,
} from './storage.events'
import {
DisableStorageUnitCommand,
EnableStorageUnitCommand,
RegisterStorageUnitCommand,
} from './storage.commands'
import {jsonEvent} from '@eventstore/db-client'
import {client as eventStore} from '../eventstore'

export class StorageAggregate extends AggregateRoot {
private id: string
private capacity: string
disabled: boolean = false

constructor() {
super()
}

registerStorage(aggregateId: string, capacity: string) {
this.apply(new StorageRegisteredEvent(aggregateId, capacity, true))
}

enableStorage(): void {
if (this.disabled) {
this.apply(new StorageEnabledEvent(this.id, this.capacity, true))
}
}

disableStorage() {
if (!this.disabled) {
this.apply(new StorageDisabledEvent(this.id, this.capacity, true))
}
}

applyStorageRegisteredEventToAggregate(event: StorageRegisteredEvent): void {
this.id = event.aggregateId
this.capacity = event.capacity
this.disabled = false
}

applyStorageDisabledEventToAggregate() {
this.disabled = true
}

applyStorageEnabledEventToAggregate() {
this.disabled = false
}

static async loadAggregate(aggregateId: string): Promise<StorageAggregate> {
const events = eventStore.readStream('storage-unit-stream-' + aggregateId)
const aggregate = new StorageAggregate()

for await (const event of events) {
const data: any = event.event.data
try {
switch (event.event.type) {
case 'StorageUnitCreated':
aggregate.applyStorageRegisteredEventToAggregate({
aggregateId: data.id,
capacity: data.capacity,
isNew: true,
})
break
case 'StorageUnitDisabled':
aggregate.applyStorageDisabledEventToAggregate()
break
case 'StorageUnitEnabled':
aggregate.applyStorageEnabledEventToAggregate()
break
default:
break
}
} catch (e) {
console.error('Could not process event')
}
}

return aggregate
}
}

// ... rest of the code will follow ...

In our NestJS application leveraging CQRS and Event Sourcing, the StorageAggregate class stands out for its role in managing storage units. As an AggregateRoot, it fully embodies the business rules and workflows for storage units, ensuring that each transaction preserves data consistency and integrity. The role of StorageAggregate in the project is multifaced. It illustrates how CQRS and Event Sourcing can be practically implemented in a real-world scenario, with StorageAggregate managing the command and event handling.

The StorageAggregate serves multiple roles are:

  1. Command Handling
    It interprets and processes commands such as registerStorage, enableStorage, and disableStorage. These commands are executed through dedicated handlers. The execution of these commands adheres to the business logic and results in the generation of events that reflect the new state of the system.
  2. Event Application
    Following the execution of business logic, StorageAggregate applies the resulting events, such as StorageRegisteredEvent, to update the state of the storage unit. This mechanism is pivotal in Event Sourcing, ensuring that each state change is captured as a series of events.
  3. State Management
    The aggregate maintains the internal state of a storage unit, tracking attributes such as its identifier, capacity, and whether it is enabled or disabled. This internal state management is crucial for reflecting the real-world status of each storage unit within the system.
  4. Event Sourcing Integration
    The loadAggregate function is a key feature demonstrating Event Sourcing in action. By reading a stream of events from EventStoreDB, it reconstructs the state of the StorageAggregate. This ability to rebuild an entity's state from its history of events is a core basic principle of Event Sourcing, providing immense value in terms of auditability and system resilience.
  5. Scalability and maintainability
    By segregating the handling of commands and the application of events, the system’s architecture is designed for scalability and maintainability. This separation of concerns facilitates easier updates and modifications to the business logic without impacting other parts of the system.

In essence, StorageAggregate is not just a class within the codebase; it is a representation of a core architectural pattern that enhances the clarity, integrity, and flexibility of the domain model. Through its design, developers are able to enforce business rules, ensure consistency, and facilitate the handling of events, all of which are crucial for a robust, scalable, and maintainable application. This approach, grounded in the principles of DDD, CQRS, and Event Sourcing, exemplifies modern software architecture practices, offering a structured and effective method for managing complex domain logic within a (NestJS) application.

Understanding the implementation of Event Sourcing with NestJS and EventStoreDB is crucial. Unlike frameworks like AXON, which offer a unified solution for CQRS and ES, NestJS and EventStoreDB require a more hands-on approach. This necessitates a clear demarcation between the two products within the stack. While AXON provides built-in functions for Event Sourcing, using NestJS with EventStoreDB means we need to implement these mechanisms ourselves, highlighting the importance of understanding their interplay in our ES and CQRS implementation.

Commandhandler

In the storage.aggregate.ts, outside the aggregate class, add the following code:

// storage.aggregate.ts
@CommandHandler(RegisterStorageUnitCommand)
export class RegisterStorageUnitHandler
implements ICommandHandler<RegisterStorageUnitCommand>
{
constructor(private readonly publisher: EventPublisher) {}
async execute(command: RegisterStorageUnitCommand): Promise<void> {
const aggregate = this.publisher.mergeObjectContext(new StorageAggregate())
aggregate.registerStorage(command.aggregateId, command.capacity)
aggregate.commit()
}
}

@CommandHandler(DisableStorageUnitCommand)
export class DisableStorageUnitHandler
implements ICommandHandler<DisableStorageUnitCommand>
{
constructor(private readonly publisher: EventPublisher) {}


async execute(command: DisableStorageUnitCommand): Promise<void> {
const aggregate = this.publisher.mergeObjectContext(
await StorageAggregate.loadAggregate(command.aggregateId),
)
if (!aggregate.disabled) {
aggregate.disableStorage()
aggregate.commit()
}
}
}

@CommandHandler(EnableStorageUnitCommand)
export class EnableStorageUnitHandler
implements ICommandHandler<EnableStorageUnitCommand>
{
constructor(private readonly publisher: EventPublisher) {}


async execute(command: EnableStorageUnitCommand): Promise<void> {
const aggregate = this.publisher.mergeObjectContext(
await StorageAggregate.loadAggregate(command.aggregateId),
)
if (aggregate.disabled) {
aggregate.enableStorage()
aggregate.commit()
}
}
}

Let’s break down the key components and their functionalities:

Commandhandlers

Each class (RegisterStorageUnitHandler, DisableStorageUnitHandler, EnableStorageUnitHandler) is a command handler, designated by the @CommandHandler decorator. These handlers are responsible for executing specific commands (RegisterStorageUnitCommand, DisableStorageUnitCommand, EnableStorageUnitCommand).

execute() Method

The execute() method is a part of the ICommandHandler interface which these classes implement.

It defines the logic that should be executed when a particular command is received. In this context, each execute method handles a different type of command related to storage units.

EventPublisher and Aggregate Interaction

Injected into each handler, EventPublisher is a NestJS service used for dealing with aggregates in an event-sourced system.

mergeObjectContext()

It merges the StorageAggregate instance with the NestJS event publisher context.

This is crucial for Event Sourcing in NestJS. It ensures that the aggregate’s events are properly published and handled within the NestJS framework, enabling features like event propagation and side-effects management.

commit() Method

After performing operations on the aggregate, commit() is called to finalize and propagate the changes.

This step is essential for triggering the events that the aggregate has recorded. It ensures that all changes made to the aggregate are acknowledged and the corresponding events are dispatched.

Summary

These command handlers in NestJS use EventPublisher to interact with an aggregate. The mergeObjectContext method links the aggregate with the NestJS event system, ensuring proper event handling and propagation. The commit() call finalizes the changes, allowing the system to record and dispatch the resulting events.

EventsHandler

In the storage.aggregate.ts, outside the aggregate class, add the following code:

// storage.aggregate.ts
@EventsHandler(StorageRegisteredEvent)
export class StorageRegisteredEventHandler
implements IEventHandler<StorageRegisteredEvent>
{
async handle(event: StorageRegisteredEvent): Promise<void> {
if (event.isNew) {
const eventData = jsonEvent({
type: 'StorageUnitCreated',
data: {
id: event.aggregateId,
capacity: event.capacity,
},
})

await eventStore.appendToStream(
'storage-unit-stream-' + event.aggregateId,
[eventData],
)
}
}
}

@EventsHandler(StorageDisabledEvent)
export class StorageDisabledEventHandler
implements IEventHandler<StorageDisabledEvent>
{
async handle(event: StorageDisabledEvent): Promise<void> {
if (event.isNew) {
const eventData = jsonEvent({
type: 'StorageUnitDisabled',
data: {
id: event.aggregateId,
disabledCapacity: event.capacity,
},
})
await eventStore.appendToStream(
'storage-unit-stream-' + event.aggregateId,
[eventData],
)
}
}
}

@EventsHandler(StorageEnabledEvent)
export class StorageEnabledEventHandler
implements IEventHandler<StorageEnabledEvent>
{
async handle(event: StorageEnabledEvent): Promise<void> {
if (event.isNew) {
const eventData = jsonEvent({
type: 'StorageUnitEnabled',
data: {
id: event.aggregateId,
enabledCapacitycd: event.capacity,
},
})
await eventStore.appendToStream(
'storage-unit-stream-' + event.aggregateId,
[eventData],
)
}
}
}

The implementation of event handling is streamlined through a set of structured classes and decorators. Classes like StorageRegisteredEventHandler, StorageDisabledEventHandler and StorageEnabledEventHandler are pivotal for managing domain events related to storage units. These classes are identified as event handlers using the @EventsHandler decorator, a NestJS construct that binds the class to a specific event type.

handle() method

Each event handler employs a handle method, mandated by the IEventHandler interface, to execute logic upon the triggering of its corresponding event. This method's core functionality is to validate the event's novelty and, if new, proceed with processing. This processing involves creating a structured event object using jsonEvent and persisting it in EventStoreDB with the appendToStream method.

jsonEvent

The jsonEvent function, provided by EventStoreDB, is instrumental in formatting the event data in a manner that is compatible with EventStoreDB's storage and retrieval mechanisms. It encapsulates key information such as the event type, aggregate ID, and relevant domain data, ensuring that each event is self-contained and descriptive.

appendToStream()

Following the creation of the event object, appendToStream is invoked to append the event to a designated stream within EventStoreDB. This stream, named dynamically based on the aggregate ID, serves as a chronological ledger of all events pertaining to a specific storage unit. This method ensures that each event is immutably logged in EventStoreDB, facilitating the reconstruction of the storage unit's state from its event history—a core tenet of Event Sourcing.

Summary

These event handlers in NestJS serve to process domain-specific events and store them in EventStoreDB. They use EventStoreDB’s functionalities (jsonEvent, appendToStream) to format and persist events.

API setup

We will control our example application via API calls. For that, we will have to set up basic endpoints.

Controller

In accordance with NestJS, we still need a controller and a module for our storage

In the /api dir create a file storage.controller.ts:

// storage.controller.ts
import {Controller, Param, Post, Query} from '@nestjs/common'
import {CommandBus} from '@nestjs/cqrs'
import {
DisableStorageUnitCommand,
EnableStorageUnitCommand,
RegisterStorageUnitCommand,
} from '../storage/storage.commands'
import {v4 as uuid} from 'uuid'

@Controller('storage')
export class StorageUnitController {
constructor(private readonly commandBus: CommandBus) {}


@Post('/register')
async registerStorage(@Query('capacity') capacity: string): Promise<any> {
const aggregateId = uuid()
await this.commandBus.execute(
new RegisterStorageUnitCommand(aggregateId, capacity),
)
return {message: 'command received', aggregateId}
}


@Post('/:id/disable')
async disableStorage(@Param('id') id: string): Promise<any> {
await this.commandBus.execute(new DisableStorageUnitCommand(id))
return {message: 'Command Received'}
}


@Post('/:id/enable')
async enableStorage(@Param('id') id: string): Promise<any> {
await this.commandBus.execute(new EnableStorageUnitCommand(id))
return {message: 'Command received'}
}
}

The StorageUnitController in NestJS serves as an endpoint manager for storage unit operations. It utilizes the CommandBus from NestJS's CQRS module to handle command execution. Commands like RegisterStorageUnitCommand, DisableStorageUnitCommand, and EnableStorageUnitCommand are dispatched to their respective handlers via the CommandBus. The commandBus.execute() method triggers these commands, each carrying necessary parameters like aggregateId and capacity. The CommandBus acts as a mediator, routing commands to their appropriate handlers, ensuring a clean separation of concerns and adherence to CQRS principles. This setup allows for modular, maintainable, and scalable application architecture.

NestJS API module

In /api, create a file storage.module.ts:

// storage.module.ts
import {client as eventStore} from '../eventstore'
import {streamNameFilter} from '@eventstore/db-client'
import {Module, OnModuleInit} from '@nestjs/common'
import {CqrsModule} from '@nestjs/cqrs'
import {StorageUnitController} from './storage.controller'
import {
DisableStorageUnitHandler,
EnableStorageUnitHandler,
RegisterStorageUnitHandler,
StorageDisabledEventHandler,
StorageEnabledEventHandler,
StorageRegisteredEventHandler,
} from '../storage/storage.aggregate'

@Module({
imports: [CqrsModule],
controllers: [StorageUnitController],
providers: [
RegisterStorageUnitHandler,
EnableStorageUnitHandler,
DisableStorageUnitHandler,
StorageRegisteredEventHandler,
StorageEnabledEventHandler,
StorageDisabledEventHandler,
],
})
export class StorageModule implements OnModuleInit {
async onModuleInit() {
this.startBackgroundSubscription()
}

private startBackgroundSubscription() {
;(async (): Promise<void> => {
await this.subscribeToAllStorageEvents()
})()
}

private async subscribeToAllStorageEvents() {
const subscription = eventStore.subscribeToAll({
filter: streamNameFilter({prefixes: ['storage-unit-stream-']}),
})

for await (const resolvedEvent of subscription) {
console.log(
`Received event ${resolvedEvent.event?.revision}@${resolvedEvent.event?.streamId}`,
)
const data: any = resolvedEvent.event.data
console.log('data:', data)
}
}
}

The code highlights the subscription to domain events through EventStoreDB, focusing on events with the storage-unit-stream- prefix. This is where aggregates could be initialized by processing the incoming events to reconstruct their current state. The setup involves:

  1. Module Initialization:
    Leveraging the OnModuleInit interface to kickstart the subscription process as the application loads.
  2. Event Subscription:
    Using subscribeToAll method from EventStoreDB to listen for events, filtered by stream names, indicating a targeted approach to event handling.
  3. Potential Aggregate Initialization:
    At this juncture, the system is poised to initialize aggregates. By iterating over the stream of events, it could update each aggregate to reflect the latest state changes dictated by these events.

Assembling Modules and Providers

At this stage in our NestJS project, we have to assemble modules and providers. This is a necessary step that involves integrating various components to build the application. It ensures that all parts of our system are correctly connected and configured.

This is what your files should look like:

app.controller.ts

import {Module} from '@nestjs/common'
import {StorageUnitController} from './api/storage.controller'
import {CommandBus} from '@nestjs/cqrs' // Angenommen, dieser Service ist erforderlich

@Module({
controllers: [StorageUnitController],
providers: [CommandBus], // und andere benötigte Services
})
export class StorageModule {}

app.module.ts

import {Module} from '@nestjs/common'
import {StorageModule} from './api/storage.module'

@Module({
imports: [StorageModule],
})
export class AppModule {}

app.service.ts

import { Injectable } from '@nestjs/common';

@Injectable()
export class AppService {
getHello(): string {
return 'Hello World!';
}
}

main.ts

import {NestFactory} from '@nestjs/core'
import {AppModule} from './app.module'
import {connect as connectToEventStore} from './eventstore'

const port: number = 8080

async function bootstrap() {
const app = await NestFactory.create(AppModule)
app.enableCors({
origin: 'http://localhost:3000',
methods: 'GET,HEAD,PUT,PATCH,POST,DELETE',
credentials: true,
});
connectToEventStore()
await app.listen(port)
}
bootstrap().then(() => console.log('Application is listening on port ' + port))

Operating the Application

Start the EventStoreDB. In the projects root dir, type:

docker-compose up

Start the application. In the projects root dir, type:

nest build
nest start

Create storage units:

http://localhost:8080/storage/register?capacity=100

This command will log an event to the console providing the UUID generated for the storageUnit.

You can use this UUID to disable and enable this specific unit.

http://localhost:8080/storage/<UUID>/disable
http://localhost:8080/storage/<UUID>/enable

Go ahead and play around. Each time you start the app, all events will be sourced and the last state will be restored.

You will only be able to disabled enabled storage units and vice versa, ensured by the Event Sourcing part of our application.

Challenges and solutions

Facing challenges and finding solutions was central to this project. Initially, learning NestJS was necessary. The documentation for NestJS was found lacking, which complicated the process. My background in layered architectures required a significant shift in approach, particularly challenging in distinguishing between CQRS and Event Sourcing (ES). Although NestJS’s CQRS module includes events and event handlers, which are beneficial, it does not inherently define the boundary between CQRS and ES.

The implementation of Event Sourcing required a manual setup, as no out-of-the-box solution exists within NestJS CQRS. This represents a limitation of not having an all-in-one solution. Despite NestJS CQRS offering Event Publishers, the absence of built-in Event Sourcing can lead to initial confusion. To navigate these challenges, I developed and applied specific tips and best practices, which were instrumental in successfully integrating CQRS and Event Sourcing within the project.

Lessons Learned

In concluding, this project has provided significant learning experiences. A key takeaway is the effective separation of concerns, enabling more manageable and modular code, aligning well with Domain-Driven Design principles. The interaction between CQRS and ES became intuitive over time, highlighting the scalability and testability of the system. While using a framework that integrates CQRS and ES with ready-made features might be more convenient, the modular approach adopted here allows for the gradual introduction of CQRS and ES. This project stands out as a unique TypeScript/JavaScript solution, emphasizing flexibility and adaptability in software development.

Outlook

In the next part of this series, we will delve into EventStoreDB, exploring its features. Part 2 will dive deeper into event sourcing and another important topic: Projections.

Ressources and Links

Article of my colleague Frank Steimle
“The Good, the Bad and the Ugly: How to choose an Event Store”:
https://medium.com/digitalfrontiers/the-good-the-bad-and-the-ugly-how-to-choose-an-event-store-f1f2a3b70b2d

Article of my colleague Frank Scheffler
“Axon 101 — Handling CLient-Side Correlation IDs”:

https://medium.com/digitalfrontiers/axon-101-handling-client-side-correlation-ids-3f11db66e0a6

NestJS CQRS guide
https://docs.nestjs.com/recipes/cqrs

EventStoreDB getting started
https://developers.eventstore.com/getting-started.html

--

--

Kersten Kriegbaum
Digital Frontiers — Das Blog

Fullstack developer & consultant located in Darmstadt, Germany, working for Digital Frontiers