Exploring Microservices Architecture with NestJS: Popular Patterns.

Vitaly Bexterev
6 min readFeb 11, 2024

--

In the realm of software development, adopting the right architectural patterns is paramount for building scalable and maintainable systems. Microservices architecture has gained widespread popularity for its ability to address the challenges of complex applications by breaking them down into smaller, manageable services. In this article, we’ll explore the implementation of microservices using NestJS, focusing on popular patterns such as CQRS, Saga, Event Bus, and Circuit Breaker.

Before delving into the technical intricacies, let’s set the stage with a simple scenario. Imagine we’re tasked with building an e-commerce platform comprising three core functionalities: Order Management, Payment Management, and Notification Service. While a monolithic approach might seem feasible initially, the potential complexity and scalability issues prompt us to opt for microservices architecture. Each service will be developed independently by specialized teams, facilitating agility and scalability in the long run.

A crucial aspect of microservices architecture is enabling seamless communication between services. We have a plethora of options at our disposal, including synchronous methods like REST API, SOAP, and gRPC, as well as asynchronous messaging using technologies like RabbitMQ and ZeroMQ. For our scenario, we’ll choose RabbitMQ for its reliability and widespread adoption, ensuring message delivery between microservices.

To enhance the scalability and maintainability of our application, we’ll embrace the Command Query Responsibility Segregation (CQRS) pattern. By separating the responsibilities for handling command input (writing data) from query input (reading data), we can optimize each side of the application independently based on its specific requirements. In our Order Service, we’ll define commands, events, and queries to streamline the handling of orders and users.

Let’s outline the domains of the order service: orders and users. Authorization, in our case, will serve as an auxiliary entity. To implement CQRS, we’ve also defined commands, events, and queries. To facilitate interaction with the database, we’ve designated a storage service, as well as services for interacting with the event bus (RabbitMQ).

We’re interested in delving into the part of handling orders. Of course, interaction with our service begins with the controller, which simply creates an order. Next, this order must be processed and sent to the payment service. Since this task involves mutating data, it touches upon commands.

@Controller('orders')
export class OrderController {
constructor(private readonly orderService: OrderService) {}

@UseGuards(AuthGuard)
@Post()
createOrder(@Request() req, @Body() order: Partial<Order>): Promise<Order> {
return this.orderService.createOrder(req.user.userId, order.products);
@Entity()
export class Order extends BaseEntity {
@Column()
userId: number;

@Column({ type: 'varchar', array: true, default: null, nullable: true })
products: string[] | null;

@Column({ type: 'smallint' })
state: OrderState;
}
@Injectable()
export class OrderService {
constructor(private readonly commadBus: CommandBus) {}

public createOrder(userId: number, products: string[]): Promise<Order> {
return this.commadBus.execute(new CreateOrderCommand(userId, products));
}
}
export class CreateOrderCommand {
constructor(
public readonly userId: number,
public readonly products: Array<string>,
) {}
}

In a distributed environment like microservices, ensuring data consistency and resilience in the face of failures is paramount. We’ll employ the Saga pattern to manage long-lived transactions spanning multiple microservices. While a dedicated Saga service is ideal, for simplicity, we’ll integrate the Saga within our Order Service. This Saga intercepts order creation events and orchestrates the interaction with the Payment Service via the event bus.

@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
constructor(
private readonly store: OrderStore,
private readonly orderPublisher: EventPublisher,
) {}
async execute(command: CreateOrderCommand): Promise<Order> {
const { userId, products } = command;
const newOrder = await this.store.createOrder({ userId, products });
const saga = this.orderPublisher.mergeObjectContext(
new OrderAggregate(newOrder.id),
);
saga.registerSaga(newOrder.products.length, 'USD', userId);
saga.commit();
return newOrder;
}
}
export class OrderAddedEvent {
public constructor(
public readonly userId: number,
public readonly orderId: string,
public readonly price: number,
public readonly currency: string,
) {}
}
export class OrderAggregate extends AggregateRoot {
public constructor(private readonly orderId: string) {
super();
}

public registerSaga(
price: number,
currency: string,
userId: number,
): void | boolean {
this.apply(new OrderAddedEvent(userId, this.orderId, price, currency));
}
}
@Injectable()
export class OrderSaga {
private logger = new Logger(OrderSaga.name);
constructor(
private sagaStore: SagaStore,
private readonly commandBus: CommandBus,
) {}
@Saga()
orderAdded = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(OrderAddedEvent),
switchMap(async (event) => {
try {
return await this.commandBus.execute(new InitSagaCommand(event));
} catch (err) {
this.logger.error(err);
}
}),
);
};
}

The saga intercepts the required event of order creation and sends a command to dispatch information about the order and the saga to the payment service via the event bus.

export class InitSagaCommand {
constructor(readonly orderEvent: OrderAddedEvent) {}
}

Microservices are inherently prone to failures, necessitating robust resilience mechanisms. The Circuit Breaker pattern serves as a safeguard against cascading failures by monitoring service requests. If a service fails repeatedly, the Circuit Breaker trips, preventing further requests for a specified duration. We’ll implement the Circuit Breaker pattern to gracefully handle service outages, ensuring the reliability of our system.

@CommandHandler(InitSagaCommand)
export class InitSagaCommandHandler
implements ICommandHandler<InitSagaCommand>
{
constructor(
private sagaStore: SagaStore,
private producerService: ProducerService,
private readonly stateService: StateService,
) {}

async execute(command: InitSagaCommand): Promise<any> {
const { orderEvent } = command;
const { orderId, currency, price, userId } = orderEvent;
const newSaga = await this.sagaStore.initSaga({ orderId });

const serviceStatus = await this.stateService.getStatusServices();
if (serviceStatus === CircuitBreakerState.IsOpen) {
throw new Error(`service payment is not available`);
}
if (serviceStatus === CircuitBreakerState.HalfOpen) {
await delay(5000);
}
await this.producerService.addToQueue({
sagaId: newSaga.id,
orderId,
currency,
price,
userId,
});
}
}
export class CircuitBreakerWorker {
private readonly logger = new Logger(CircuitBreakerWorker.name);
private readonly MAX_CYCLE = 3;
private problemCycle = 0;
constructor(@Inject(CACHE_MANAGER) private cacheManager: Cache) {}

@Interval(60000)
async handleCircuitBreakerState(): Promise<void> {
try {
let failedDeliveryCount: number | undefined = await this.cacheManager.get(
CircuitBreakerCounts.FailedDeliveryCount,
);

let currencyState: CircuitBreakerState | undefined =
await this.cacheManager.get('circuitBreakerState');

if (failedDeliveryCount === undefined) {
failedDeliveryCount = 0;
await this.cacheManager.set(
CircuitBreakerCounts.FailedDeliveryCount,
failedDeliveryCount,
);
}

if (currencyState === undefined) {
currencyState = CircuitBreakerState.IsClose;
await this.cacheManager.set('circuitBreakerState', currencyState);
}

if (failedDeliveryCount > 0) {
await this.pessimisticProcess(currencyState);
} else {
await this.optimisticProcess(currencyState);
}

this.logger.warn(
`circuitBreakerState ${currencyState} : failedDeliveryCount ${failedDeliveryCount}`,
);

await this.cacheManager.set(CircuitBreakerCounts.FailedDeliveryCount, 0);

this.logger.log('Circuit Breaker State Updated Successfully');
} catch (error) {
this.logger.error('Error updating circuit breaker state', error);
}
}

private async pessimisticProcess(state: CircuitBreakerState): Promise<void> {
if (state === CircuitBreakerState.IsClose && this.problemCycle === 0) {
await this.cacheManager.set(
'circuitBreakerState',
CircuitBreakerState.HalfOpen,
);
return;
}

if (
state === CircuitBreakerState.HalfOpen &&
this.problemCycle > this.MAX_CYCLE
) {
await this.cacheManager.set(
'circuitBreakerState',
CircuitBreakerState.IsOpen,
);
this.problemCycle = 0;
return;
}

if (
state === CircuitBreakerState.HalfOpen &&
this.problemCycle <= this.MAX_CYCLE
) {
this.problemCycle += 1;
}
}

private async optimisticProcess(state: CircuitBreakerState): Promise<void> {
if (state === CircuitBreakerState.IsOpen) {
await this.cacheManager.set(
'circuitBreakerState',
CircuitBreakerState.HalfOpen,
);
this.problemCycle = 0;
return;
}

if (state === CircuitBreakerState.HalfOpen) {
await this.cacheManager.set(
'circuitBreakerState',
CircuitBreakerState.IsClose,
);
this.problemCycle = 0;
return;
}
}
}

And finally, the producer service for dispatching our saga to external services via RabbitMQ.

@Injectable()
export class ProducerService {
private channelWrapper: ChannelWrapper;
private logger = new Logger(ProducerService.name);
constructor(private readonly rabbitConfig: RabbitMQConfig) {
this.initialize();
}

private initialize() {
const connection = connect([this.rabbitConfig.uri]);

this.channelWrapper = connection.createChannel({
setup: async (channel: ConfirmChannel) => {
const queues = new Map();
await Promise.all(
this.rabbitConfig.exchanges.sender.map((exchange: Exchange) => {
if (exchange.queue && !queues.has(exchange.queue)) {
queues.set(
exchange.queue,
channel.assertQueue(exchange.queue, {
durable: true,
maxLength: 10,
deadLetterExchange: 'dlx',
deadLetterRoutingKey: 'dlx-routing-key',
}),
);
}
return channel.assertExchange(exchange.name, 'topic', {
durable: true,
});
}),
);
await Promise.all([
channel.assertQueue('deadLetterQueue', {
durable: true,
}),
channel.assertExchange('dlx', 'topic', {
durable: true,
}),
...queues.values(),
]);

await Promise.all(
this.rabbitConfig.exchanges.sender.filter((exchange: Exchange) => {
if (exchange.queue) {
return channel.bindQueue(
exchange.queue,
exchange.name,
exchange.topic,
);
}
}),
);
await channel.bindQueue('deadLetterQueue', 'dlx', 'dlx-routing-key');
this.logger.log('Channel setup completed for ProducerService.');
},
});
}

async addToQueue(data: any, ex?: Exchange) {
try {
const exchange: Exchange = ex || this.rabbitConfig.exchanges.sender[0];
const routingKey = exchange.topic;

await this.channelWrapper.sendToQueue(
exchange.queue,
Buffer.from(JSON.stringify(data)),
{
headers: { exchange: exchange.name, routingKey },
expiration: 6000,
},
);

this.logger.log('Message added to the queue successfully.');
} catch (error) {
this.logger.error('Error adding message to the queue', error.stack);
}
}

async close() {
await this.channelWrapper.close();
}
}

Conclusion: In this article, we’ve explored the implementation of microservices architecture using NestJS and popular patterns like CQRS, Saga, Event Bus, and Circuit Breaker. By embracing these architectural solutions, we can build robust, scalable, and resilient systems capable of meeting the demands of modern applications. As you embark on your microservices journey, remember to adapt these patterns to your specific use case and continuously iterate based on evolving requirements.

Next Steps: If you found this article insightful, stay tuned for future installments where we’ll delve deeper into other microservices and their interactions. In the meantime, feel free to experiment with the concepts discussed here and share your feedback or questions in the comments section below.

Thank you for your interest in the article, at your request I added a link to the repository.

--

--