NATS Streaming in the Node.js World with Kubernetes How-To Guide

Matan Yemini
May 23, 2020 · 12 min read

We will cover introduction to NATS Streaming, how to implement the NATS Streaming Server and its clients (w/ Node.js) in a microservices environment, and how to deploy it on k8s.

What is NATS Streaming?

First, you have to understand that NATS and NATS Streaming are two different things!

While NATS is Lightweight publish-subscribe & distributed queueing messaging system. Unlike traditional enterprise messaging systems, NATS has an always-on dial tone that does whatever it takes to remain available. This forms a great base for building modern, reliable, and scalable cloud and distributed systems.

NATS Streaming is a lightweight, streaming platform built on top of NATS that provides an API for persistent logs. NATS Streaming is written in Go. It can be employed to add event streaming, delivery guarantees, and historical data replay to NATS. The high-level features of NATS Streaming is similar like the capabilities of Apache Kafka, but the former wins when you consider simplicity over complexity. Because NATS Streaming is relatively a new technology, it needs to improve in some areas especially on providing a better solution for load balancing scenarios, as compared to Apache Kafka. NATS streaming uses the concept of a channel to represent an ordered collection of messages. Clients send to and receive from channels instead of subjects.

Why NATS Streaming?

How it Works?

I have personally used NATS Streaming server as my ‘event bus’ in my microservices architecture. Every kind of event has its own ‘channel’. For example, if we will have a ticket buying app we will have event kinds such as ‘ticket:created’ and ‘ticket:updated’ that will be our channels, while the events will be specified to the requests. As a result, we will have a channel of ‘ticket:created’ with an event inside (a request that came is) that will look like: ticket:created | created | userId: ‘abc’ | price: 50 | version: 1 . The versioning is super important so we will preserve the sequence of the actions (will be discussed). In addition, our NATS Streaming server will include Queue Groups that will manage the reading from our channels. Our server will preserve its listeners and publishers and will define subscription options for them. The stan client is actually our NATS Streaming client that we are going to discuss on. The server will check for heartbeats from the clients and define their availability.

Not our final result (mid-example)

Before we are going to dive in through the implementation, lets start from our deployment.

NATS Streaming Server Deployment and Configuration (Kubernetes)

When using NATS Streaming we will have 3 main option to how connecting it to our Kubernetes cluster:

  1. using Ingress-Nginx as our gateway to the NATS Streaming cluster service:

2. Instead of the Ingress expose the service via NodePort

3. The easy way that I want to focus on, via port-forwarding(didn’t to dive into k8s manual)

Port 4222 as an example

Now, we are going to build our k8s specification using the NATS-Streaming image from docker hub. Take a look for a yaml file configuration example:

apiVersion: apps/v1
kind: Deployment
metadata:
name: nats-depl
spec:
replicas: 1
selector:
matchLabels:
app: nats
template:
metadata:
labels:
app: nats
spec:
containers:
- name: nats
image: nats-streaming:0.17.0
args:
[
'-p',
'4222',
'-m',
'8222',
'-hbi',
'5s',
'-hbt',
'5s',
'-hbf',
'2',
'-SD',
'-cid',
'ticketing',
]
---
apiVersion: v1
kind: Service
metadata:
name: nats-srv
spec:
selector:
app: nats
ports:
- name: client
protocol: TCP
port: 4222
targetPort: 4222
- name: monitoring
protocol: TCP
port: 8222
targetPort: 8222

Lets simplify it: In the first section of the example (before the 3 dashes in the middle) we are defining our NATS Streaming Deployment as ‘nats-depl’. Our base docker image will be NATS Streaming server from version 0.17.0. “-p” is port and then its number ‘4222’ and “-m” refers to our http monitoring (will be shown) “-hbi” refers to the time the server send heartbeat to check the , and “-hbt” refers to the client response. “-SD” enables debugging our cluster and “-cid” defines our cluster id which in this case will be ‘ticketing’.

After the dashes we define our NATS Streaming service. In this example, the server will listen on port ‘4222’ and enable http monitoring on ‘8222’.

To connect this deployment to our service we will make sure that we have at least these configurations: (from the view of my tickets service deployment, a service for doing CRUD actions on tickets): pay attention it is not the full yaml file.

...
spec:
containers:
- name: tickets
image: matanyemini/tickets
env:
- name: NATS_CLIENT_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NATS_URL
value: 'http://nats-srv:4222'
- name: NATS_CLUSTER_ID
value: ticketing
...

Now we can refer to this server according to our environment settings.

Defining our goal and architecture

Lets describe a common generic use case: we have a ticketing app that is going to include (actually you will need more, I want to put my focus on these three):

  1. Ticket service: for CRUD actions on our tickets.
  2. Authentication service: for CRUD action on users and authentication (not our focus this is why I am simplifying it)
  3. Orders service: for handling orders of users that are buying tickets

Now, lets think about some common problems that we can encounter:

  1. What happens if we want to horizontal scale one of our services? When you scale horizontally a service you increase its number of instances (adding more pods). So, how we can make sure that only one service will handle the event, and two or more? Think how bad it can be if 2 services will handle the same action. Our answer will be the usage of queue groups, so every service will handle a different event in the queue of the channel. => another problem!
  2. What will happen if one action was failed from any reason and then we need to execute the next one? Think about a scenario when an action of ‘ticket:deleted’ was failed in one of our tickets service pods and then we will have a different event of ‘ticket:updated’ with the same ticket id? our program will crash. So, in order to handle these we will create a “processed sequence” in which the events are numbered. => another problem!
  3. When the sequence is numbered, we will have a problem of sequential processing of our events. For this, our numbering will be dedicated to a specific functional object (user for example). So, we will have {ticket:created | id: ‘123’ | created | #1} and then {ticket:created | id: ‘123’ | updated | #2 | updateData} -> a new problem! we will have to create channel for each object!
channel to every object!!

To solve all these problems, you will have to design your application in a different way, maybe not the obvious one for less experienced developers. Our concept:

Simple option

After reading the explanations in the PNG that is mentioned above. We might have another problem: What if one of our services is down? how we can make sure that all the events will be executed? For this we have ‘Durable Subscriptions’ all our subscriptions from the NATS Streaming clients will be from this kind. This will make that when a service has successfully processed an event, he will send ACK that the action finished, and then the event is marked as ‘PROCESSED’ in the queue of the channel. Don’t forget the Queue Group feature that guarantees our subscribers list, even if they are temporarily unavailable.

When combining our conclusion with the example: (higher level look)

I tried to simplify it as much as I can — some things that have been mentioned above are not included. And probably for a real ticketing app it is not enough.

Base Listener and Publisher Implementation — using Node.js and TypeScript

For using NATS Streaming, we will need to install the package ‘node-nats-streaming’

import { Message, Stan } from 'node-nats-streaming';
import { Subjects } from './subjects';
interface Event {
subject: Subjects;
data: any;
}
export abstract class Listener<T extends Event> {
abstract subject: T['subject'];
abstract queueGroupName: string;
abstract onMessage(data: T['data'], msg: Message): void;
private client: Stan;
protected ackWait = 5 * 1000;
constructor(client: Stan) {
this.client = client;
}
subscriptionOptions() { // How our client is going to behave
return this.client
.subscriptionOptions()
.setDeliverAllAvailable() // send the event to those who can
.setManualAckMode(true) // sending ack when the event processed - mentioned earlier in this story
.setAckWait(this.ackWait) // how long to wait for ack
.setDurableName(this.queueGroupName); // what queue group this listener is, and define him as durable! as mentioned earlier
}
listen() {
const subscription = this.client.subscribe(
this.subject,
this.queueGroupName,
this.subscriptionOptions()
);
subscription.on('message', (msg: Message) => {
console.log(`Message received: ${this.subject} / ${this.queueGroupName}`);
const parsedData = this.parseMessage(msg);
this.onMessage(parsedData, msg);
});
}
parseMessage(msg: Message) {
const data = msg.getData();
return typeof data === 'string'
? JSON.parse(data)
: JSON.parse(data.toString('utf8'));
}
}

The subjects are event types, for our example:

export enum Subjects {
TicketCreated = 'ticket:created',
OrderUpdated = 'order:updated',
}

And a Base Publisher will look like:

import { Stan } from 'node-nats-streaming';
import { Subjects } from './subjects';
interface Event {
subject: Subjects;
data: any;
}
export abstract class Publisher<T extends Event> {
abstract subject: T['subject'];
private client: Stan;
constructor(client: Stan) {
this.client = client;
}
publish(data: T['data']): Promise<void> {
return new Promise((resolve, reject) => {
this.client.publish(this.subject, JSON.stringify(data), (err) => {
if (err) {
return reject(err);
}
console.log('Event published');
resolve();
});
});
}
}

You can take those base classes and generate our of them you relevant listener and publisher. Listener will consume the events in the channels while publisher will emit the events to our NATS Streaming server.

Now lets define our events, for example we will create our ‘ticket:created’ event:

import { Subjects } from './subjects';export interface TicketCreatedEvent {
subject: Subjects.TicketCreated;
data: {
id: string;
title: string;
price: number;
};
}

After setting the relevant possible events, and building robust base classes for both listener and publisher. We will need to create the derived listener that will include specification for the each event. For the example we will look at a ‘ticket:created’ event listener

import { Message } from 'node-nats-streaming';
import { TicketCreatedEvent } from './ticket-created-event';
import { Subjects } from './subjects';
import { Listener } from './base-listener';
export class TicketCreatedListener extends Listener<TicketCreatedEvent> {
subject: Subjects.TicketCreated = Subjects.TicketCreated;
queueGroupName = 'tickets-service';
onMessage(data: TicketCreatedEvent['data'], msg: Message) {
console.log('Event data!', data);
msg.ack();
}
}

And a publisher will look like (‘ticket:created’ event publisher) — will be at orders service:

import { Subjects } from './subjects';
import { TicketCreatedEvent } from './ticket-created-event';
import { Publisher } from './base-publisher';
export class TicketCreatedPublisher extends Publisher<TicketCreatedEvent> {
subject: Subjects.TicketCreated = Subjects.TicketCreated;
}

Set our service to be a listener / publisher:

Before connecting the clients (publisher/listener) we will need to wrap the implementation: (nats-wrapper)

import nats, { Stan } from 'node-nats-streaming';class NatsWrapper {
private _client?: Stan;
get client() {
if (!this._client) {
throw new Error('Cannot access NATS client before connecting');
}
return this._client;
}
// the connect function is explained laterconnect(clusterId: string, clientId: string, url: string) {
this._client = nats.connect(clusterId, clientId, { url });
this._client.on('close', () => {
console.log('NATS listener closed');
process.exit();
});
process.on('SIGINT', () => this.client.close());
process.on('SIGTERM', () => this.client.close());
return new Promise((resolve, reject) => {
this._client!.on('connect', () => {
console.log('Connected to NATS!');
resolve();
});
this._client!.on('error', (err) => {
reject(err);
});
});
}
}
export const natsWrapper = new NatsWrapper();

The connect function receives its values from your startup file (index.ts for example). In there we should define the cluster id, the url, get having a client id, so the streaming server will know whom he is connected to.

await natsWrapper.connect(
process.env.NATS_CLUSTER_ID!,
process.env.NATS_CLIENT_ID!,
process.env.NATS_URL!
);

remember to check that those arguments are received!! those arguments are received from our deployment settings as environment variables — look above to where we created the yaml configuration file for one of our client’s deployment. The URL, the Cluster ID and the Client ID (from the service metadata name) are specified there.

Now we only have left the final action itself → we will look on our tickets service and particularly on the publish ‘ticket:created’ event.

import { Publisher, TicketCreatedEvent } from '@yemini/common';
import { Subjects } from '@yemini/common';
export class TicketCreatedPublisher extends Publisher<TicketCreatedEvent> {
subject: Subjects.TicketCreated = Subjects.TicketCreated;
}

(@yemini/common) is a common package that includes the things we have mentioned earlier — you can find it at NPM : https://www.npmjs.com/package/@yemini/common → (this package include also error handler middleware and authentication via JWT) this meant to be for private usage, so there is no docs for now :) but you can take the things we have talked about..

Then we can actually publish the event with the relevant data:

...
new TicketCreatedPublisher(natsWrapper.client).publish({
id: ticket.id,
title: ticket.title,
price: ticket.price,
userId: ticket.userId,
});
...

Remember we said that we expose our NATS Streaming service via port forwarding we should define it!

I am using minikube as my Kubernetes cluster, but the idea is the same

$ kubectl get pods

Check your nats-depl (as we named it before)

and then:

$ kubectl port-forward nats-depl-5458db5b74-24rc4 4222:4222

Now, remember that our NATS Streaming service has also port 8222 exposed — for monitoring. You can go to your browser and enter localhost:8222 and see:

There you can find info about the data is being transferred in each channel and etc.

An example for pressing ‘server’ and viewing the connected clients

There you can also see the passed events and their data.

Using NATS Streaming in a Microservices Architecture

When building distributed systems, Microservices pattern is a great choice. In a Microservices architecture, you build applications by composing a suite of independently deployable, small, modular services. In order to solve practical challenges of Microservices with regards to managing distributed actions (like transactions) and data consistency, an event-driven and in particularly Event-Sourcing architecture is a highly recommended approach. Event Sourcing deals with an event store of immutable log of events, in which each log (a state change made to a domain object) represents an application state. Every log describes a state in the application. So, in NATS Streaming each log is actually an event that is appended to the relevant channel from where they can be published → Then, other services can subscribe those channels (contains messages) and consume them.

In Conclusion

When you build distributed systems, you can use NATS Streaming as the nervous system for your applications for publishing events to data streams and exchanging messages between different systems in asynchronous manner. When the performance is your event-driven application base, NATS Streaming can be the right tool for you! I will not recommend NATS Streaming for building enterprise queueing, but for smaller applications, this light-weight platform is amazing. In addition, I personally recommend you to use Golang when dealing with NATS Streaming, but the fun with Node.js beats my common logic :)

Beyond Code

Stories about the whole coding process and related technologies