Building Distributed Systems and Microservices in Go with NATS Streaming

A while ago, I have written a blog post Introducing NATS to Go Developers for using Apcera NATS as the messaging system for building distributed systems and Microservices in Go. In this post, I will take a look into NATS Streaming server, which built on the top of basic NATS server that provides a persistent log for your messages which you publish on the NATS.

NATS is an open source, lightweight, high-performance cloud native messaging system. I love to use NATS for building distributed systems in Go because for its performance and simplicity. Because building distributed systems always bring lot of complexities, using a simple and high performant messaging system is an important and vital decision. NATS is available in two interoperable modules: the core NATS platform -the NATS server (executable name is gnatsd) referred to simply as NATS, and NATS Streaming (executable name is nats-streaming-server).

Introducing NATS Streaming

The basic NATS server is designed for high performance and simplicity, which doesn’t provide a persistent store for the messages that you publish over the NATS. Having the lack of a persistent store for messages will be a problem for many of the distributed systems. For instance, let’s say that one of your subscriber systems, is down when you publish a message, then the subscriber system doesn’t receive the message, hence you have to provide an architecture approach for dealing with such scenarios. For another instance, let’s say that you would like to add a new system into an existing distributed systems environment in which you would like to receive all messages from the beginning to get a history of data, but you don’t get messages from the basic NATS server because of the lack of a persistent store. On the other hand, NATS Streaming comes with a persistent store for having a log for the messages that publish over the NATS server. If you need persistent messaging and delivery guarantees, you can use NATS Streaming instead of the core NATS platform.

NATS Streaming is an extremely performant, lightweight reliable streaming platform built on the top of core NATS platform that provides persistent logs. NATS Streaming is written in Go. It can be employed to add event streaming, delivery guarantees, and historical data replay to NATS. Keep in mind that NATS Streaming is not a separate server, but it uses the NATS server (gnatsd). In short, NATS Streaming embeds a NATS server as the messaging server, and provides an extra capability of having a persist logs to be used for event streaming systems.

NATS Streaming provides the following high-level feature set:

  • Log based persistence
  • At-Least-Once Delivery model, giving reliable message delivery
  • Rate matched on a per subscription basis
  • Replay/Restart
  • Last Value Semantics

The high-level features of NATS Streaming is similar like the capabilities of Apache Kafka, but the former wins when you consider simplicity over complexity. Because NATS Streaming is relatively a new technology, it needs to improve in some areas especially on providing a better solution for load balancing scenarios, as compared to Apache Kafka.

Channels

Channels are the most important concept in NATS Streaming Server. Channels are subject clients that send data to and consume from. Unlike the basic NATS Server, NATS Streaming server does not support wildcard for channels. You can control the number of channels by using configuration. Messages published to a channel are stored in a message log inside the channel as shown in the below picture.

File store for persistent logs

The picture above depicts a file store for persistent logs in which a directory named order-notification is used to store messages for a channel with same name.

Message Log

Messages that published to a channel are appended to the end of the log in the persistent store. The limit for the message can be configured. If a limit is configured for globally for all channels, or specifically for a particular channel, when the limit is reached, older messages will be pruned to limit the size in the persistent log in order to append newer messages. By default, NATS Streaming uses an in-memory store for the messages, but this behaviour can be changed using configuration.

Setup NATS Streaming

In order to download and install NATS Streaming, use one of the pre-built release binaries from the GitHub releases page or use the official Docker image called nats-streaming. You can also get the NATS Streaming by using go get command:

go get github.com/nats-io/nats-streaming-server

In order to create NATS client applications in Go, download and install the Go package using the go get command:

go get github.com/nats-io/go-nats-streaming

To run the NATS Streaming, run its binary named nats-streaming-server:

 nats-streaming-server

By default, NATS Streaming uses an in-memory store for the messages, hence you will lose the messages if the NATS server shut down. So a better option is to use a file store by providing the store flag when you run the NATS Streaming server as shown in the below:

nats-streaming-server \
--store file \
--dir ./data \
--max_msgs 0 \
--max_bytes 0

Here’re the flags used to run the NATS Streaming server:

--store <string>           Store type: MEMORY|FILE (default: MEMORY)
--dir <string> For FILE store type, this is the root directory
--max_msgs <int> Max number of messages per channel (0 for unlimited)
--max_bytes <size> Max messages total size per channel (0 for unlimited)

In the preceding configurations used to run the NATS streaming server, configuration options are specified for using a file store inside a root directory data for message logs, and specified that unlimited number of messages per channel and unlimited message can be stored into message log.

The image below depicts that NATS Streaming Server is running with a cluster named “test-cluster”:

NATS Streaming Server runs using an embedded NATS Server

When you run the NATS Streaming Server, the embedded NATS Server is automatically started and listening for client connections on the default port 4222. Hence you don’t need to manually run the NATS server with NATS Streaming.

Building Distributed Systems with NATS Streaming

When you build distributed systems, you can use NATS Streaming as the nervous system for your applications for publishing events to data streams and exchanging messages between different systems in asynchronous manner. Keep in mind that NATS Streaming is not a typical messaging system, but it is more than a messaging system that provides an event streaming platform. In the recent past, lot of people had been used Apache Kafka as simply a messaging system without understanding its core capabilities.

Using NATS Streaming in Microservices Architecture

When you build distributed systems, Microservices pattern is a great choice. In a Microservices architecture, you build applications by composing a suite of independently deployable, small, modular services. When you move to Microservices architecture from monolithic applications, you need to solve many practical challenges. For example, a business transaction may span into several Microservices because we broke up a monolithic system into several autonomous services. A transaction may need to perform persistence into many Microservices where you need to manage data consistency as well. In order to solve practical challenges of Microservices with regards to managing distributed transactions and data consistency, an event-driven architecture is a highly recommended approach. Among the various event-driven architectures, I highly recommend for Event Sourcing, which is an event-centric architecture to construct the state of an application by composing various events. Event Sourcing deals with an event store of immutable log of events, in which each log (a state change made to a domain object) represents an application state. Because every state change in the application, is treated as an immutable log, you can easily troubleshoot the application and can also going back to a particular version of application state at any time. An event store is exactly like message log of NATS Streaming, to which messages published to a channel are appended to the log. NATS Streaming currently doesn’t support Database systems for persistent log, but I hope that this capability will be available in near future which can also be leveraged as the event store for your distributed applications in Event Sourcing implementation.

When you build Microservices with an event-driven architecture, you can use NATS Streaming as the event streaming platform to publish events via channels whenever domain events are happened on the state changes of aggregates (DDD Aggregates) or simply domain entities, so other Microservices can subscribe those messages from channels and perform its own actions and publish other set of events to let other Microservices know that some state changes are happened. And having a persistent log for the messages published on the channels, NATS Streaming gives you the messaging capabilities for building modern distributed systems with an efficient manner.

Example with NATS Streaming

My primary objective of this post is not about discussing Microservices related patterns, but introducing NATS Streaming to Go developers by using an example demo, in which you may find some fluid implementations on Microservices related patterns. The source of the example is available on GitHub from here. The example consists of the following Go packages:

  • pb: Protocol Buffers definitions to describe message types and RPC endpoints.
  • orderservice: An HTTP API server that let customers to create Orders. When a new Order is placed, an event “OrderCreated” is triggered, hence it calls an gRPC method “CreateEvent” provided by eventstore to publish events to the Event Store.
  • eventstore: A gRPC server and a NATS Streaming client that persists domain events into Event Store and publish events on NATS Streaming channels. This example assumes that state of the application is composed by various events ( A fluid implementation of Event Sourcing pattern). All command operations are persisted into an Event Store as events. Here CockroachDB is used for persisting events.
  • restuarantservice: A NATS Streaming client that subscribe messages from a NATS Streaming channel “order-notification” to get messages when new orders are created via orderservice and messages are published over channel “order-notification” from eventstore.
  • orderquery-store1: A NATS Streaming client that subscribes messages with a QueueGroup (a NATS messaging pattern) from a NATS Streaming channel “order-notification” to get messages when events are happened on a aggregate Order. The objective of this package is to persist data model for querying data, based on the domain events persisted in the Event Store. The example demo assumes that separate data models are being used for both command operations and query operations (CQRS). Because you’re keeping separate data models for both command and query, you can have denormalized data sets o n the data models for query. Here CockroachDB is used for persisting data sets for query model. In real-world scenarios, separate databases will be used for both command and query models.
  • orderquery-store2: A NATS Streaming client that subscribes messages with a QueueGroup from a NATS Streaming channel “order-notification”. Both orderquery-store1 and orderquery-store2 do the same thing — perform the data replication logic for making a store for querying the data which is constructed from Event Store. In order to distribute data replication logic, it works as QueueGroup subscriber clients (orderquery-store1 and orderquery-store2).
  • store: This is a shared library package that provides persistence logic to working with CockroachDB database. Note that CockroachDB is a brilliant distributed database system written in Go. I will write a blog post on CockroachDB later on.

Publish Events

Here’s the code block from eventstore that publish events on NATS Streaming when its RPC method CreateEvent is invoked:

Listing 1. NATS Streaming Publisher Client

The function Connect of NATS Streaming client, makes a connection to the NATS Streaming server. You must provide Cluster ID and Client ID to connect to the NATS Streaming server. The Client ID is important as it is used by the server to uniquely identify a NATS Streaming client. Thus two connections with the same Client ID is not possible.

The method Publish of NATS Streaming connection is used to publish messages by providing a subject (channel) and message. In the example demo, both channel and message data are coming from the gRPC client application (orderservice). Here we provide channel name as “order-notification”, and, for message, a JSON string to represent an Order, are passed from orderservice to creates an Order.

The API method Publish publishes messages to the cluster synchronously, and wait for an ACK (acknowledgement). The API method PublishAsync publishes messages asynchronously. This will return a GUID for the message being sent to the cluster.

Listing 2. Publish messages asynchronously to NATS Streaming Server

Creating Subscriber Clients

The basic NATS Server, which doesn’t provide a persistent log, comes with very limited capabilities for subscribing messages. When you publish messages, if the subscriber client is down, it can’t receive messages from the server. Because NATS Streaming Server comes with a persistent log, it provides lot of capabilities for subscribing messages from the NATS server.

A client creates a NATS Streaming subscription on a given channel, and messages on the channel will be sent to the subscriber clients from the message log. The server will send up to the maximum number of inflight messages as given by the subscriber client (You can specify the maximum number of inflight messages) when creating the subscription. When messages are received from subscriber clients, ACK (acknowledgement) will be sent to the server. ACK will be sent automatically, but you can also configure it for manually sending ACK to the server.

There are several types of NATS Streaming subscriptions:

  • Regular
  • Durable
  • Queue Group
  • Redelivery

In the example demo, we are using durable subscriptions, which let subscriber clients resume message consumption from where it previously stopped. With a durable subscription, NATS Streaming server can maintain the state of subscriber clients even after the client connection is closed. The durable subscriptions are created by providing a durable name. You can also use durable subscriptions for subscriber clients created with Queue Group.

Here’s the code block that creates a subscriber client with durable subscription for restaurantservice, to receive messages published on the channel “order-notification”:

Listing 3. A NATS Streaming client that subscribe messages from a channel “order-notification”

A subscriber client is created on the channel “order-notification” with a durable subscription by providing a durable name using function DurableName of package go-nats-streaming. The subscriber client configures to sent ACK manually by using function SetManualAckMode. Some scenarios, you might be preferred to send ACK manually. Once you configure for sending ACK manually, you have to explicitly call function Ack of NATS Streaming messages.

msg.Ack() // Manual ACK

If you haven’t specified SetManualAckMode, the ACK will be sent automatically after the subscriber’s message handler is invoked.

NATS Streaming provides At-Least-Once delivery of messages for subscriber clients on the given channel. If an ACK is not received within the configured timeout interval (default value is 30 seconds), NATS Streaming will attempt redelivery of the messages. The function AckWait is used to configure timeout interval. The preceding code block sets 60 seconds for timeout interval. You can also limit the maximum number of messages sent by NATS Streaming server without an ACK by using function MaxInflight.

Creating Subscriber Clients with Queue Group

The subscriber clients can be created by specifying a Queue Group. Multiple subscriber clients on a same channel with a same queue name forms a Queue Group. The queue subscribers let you distribute the message processing with multiple subscribers. When you publish a message on a channel, it will be sent to one of the subscribers of the same Queue Group. When you publish millions of messages in short span of time, and if order of the message processing is not important , queue subscribers can efficiently distribute the message processing in parallel and can provide high performance.

In the example demo, messages are published from the eventstore app when domain events are happened, and messages are subscribed on a channel “order-notificaton” from the following three subscribers:

  1. restaurantservice
  2. orderquery-store1
  3. orderquery-store2

Among the three subscribers, restaurantservice is a durable subscriber without having a Queue Group, but rest of the two subscribers are form a Queue Group with a same queue name. When a message is published on the channel “order-notificaton”, restaurantservice will always receive that message, but one the subscribers in the Queue Group, i.e. orderquery-store1 or orderquery-store2 will receive the message. We can also create queue subscribers by providing a durable option by providing a same durable name for all subscribers in the same Queue Group. In the example demo, queue subscribers are used for implementing data replication for the query model as all command operations are persisted as a sequence of events, an immutable logs of state changes of domain entities, DDD Aggregates. Having a separate model for command and query, is really a great options for building Microservices with Event Sourcing and CQRS.

Here’s the code block that creates queue subscribers:

Source Code

The source code for the example demo is available on GitHub from here.

Clustering NATS Streaming Server

NATS Streaming is relatively a new technology, which definitely need some imporvements. NATS Streaming Server does not support clustering at this moment. But the enthusiastic NATS team is planning for a better solution for clustering, and I hope that NATS team will come with a solution very soon. Although NATS Streaming Server does not support clustering at this moment, you can cluster NATS Servers, as the basic NATS Server supports clustering which is being embedded into NATS Streaming. So there is a workaround to solve the clustering problem by running a single NATS Streaming server attached to a cluster of NATS Servers.

Fault Tolerance

It’s worth to note that NATS Streaming server can be run in Fault Tolerance (FT) mode by forming a group of servers with one acting as the active server, and all others acting as standby servers. This will help you to minimize the single point of failure. The active server in the Fault Tolerance group (FT group) access the persistent store and handle all communication with clients, and all standby servers are running for the detection of the active server failure. You can have multiple standby servers in a same Fault Tolerance group. When the active server in the FT group fails, all standby servers will try to become activate, then one server will become active server and recove the persistent store and service all clients. In order to use a shared state of persistent store for an FT group, data store needs to be mounted by all servers in the FT group.

You can follow me on twitter at @shijucv. I do provide training and consulting on Go programming language (Golang) and distributed systems architectures, in India.