Implementing CQRS using Kafka and Sarama Library in Golang
A prerequisite to this post is Building Scalable Applications Using Event Sourcing and CQRS by OKONKWO VINCENT IKEM; not just because it provides the necessary context for this writeup but it also doubles as an insightful introduction to Kafka and also allows me to make certain assumptions; the first of which is that you’ve read it :)
“Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later)”. That’s from the Readme on Sarama github page (and arguable for a beginner, that might be the only thing that makes sense at first there).
Aside being robust, that Sarama implements the recent version of Kafka makes it the golang library of choice to use with Kafka. The consumer groups feature in recent Kafka versions finds great use in horizontally scaled applications because it ensures that for replicated consumers, duplicate execution of published message doesn’t occur.
To the fun part
Controllers turn Producers
In a traditional MVC application, the controllers passes database read and write request directly to the models. As such implementing CQRS for such application requires that we separate both operations. To use Kafka, we provision a broker where database write requests are registered under appropriate topic and from where interested participants in the topic pick up the requests and perform the necessary actions.
So, say we want to create a resource, (for example, a user), instead of calling the User model’s create action directly from the controller, we emit the necessary user payload as a message to, say, a create-user-topic on the Kafka broker which we expect the create action to be subscribed to. With Sarama, this is achieved using producers.
Sarama has two kinds of producers, the SyncProducer and AsyncProducer. As you might have guessed, the SyncProducer publishes messages, blocking until they have been acknowledged while the AsyncProducer publishes messages using a non-blocking API. The choice will largely depend on the design of your overall architecture. However you should note that
The SyncProducer is a very thin wrapper around the AsyncProducer — the SendMessage method simply pushes the message onto the underlying async queue, then waits for the response. As such, you should get effectively identical performance with either.
Here’s a sample code snippet to create a Sarama producer:
To create an AsyncProducer we have instead
producer, err := sarama.NewAsyncProducer(brokers, config)
To produce a message, wrap the message in the saram.ProducerMessage struct and then pass it on to the producer instance.
For all available ProducerMessage configuration, including how to specify the partition to send message to using partition key, checkout the documentation.
SendMessage returns the partition the message was sent to, the offset of the message on Kafka and error information.
Note: be sure to shutdown the producer to avoid memory leaks by calling the Close() function on the producer instance.
To ensure a high chance for data consistency, it’s often required to carry out extensive data validation at the controller-cum-producer level. This is to ensure that any published message has a very high chance of a successful database transaction.
Models turn Consumers
Model functions that execute database write operations are modified to become sarama consumers that subscribe to relevant Kafka topics. And based on the emitted messages, these model functions perform their normal actions. So the createUser function, for instance, subscribes to create-user-topic; updateUser function to update-user-topic; deleteUser function to delete-user-topic etc.
To create a consumer, you will need addresses of the Kafka brokers
Like the producer, you can provide a sarama config struct as second argument with your desired configurations.
To setup the consumer to listen for messages, we provide the topic, partitions and offset the consumer is to subscribe to. It can subscribe to more than one partition and topic. To have a consumer subscribe to all partitions on a topic, use the partitions function on the consumer to return all available partitions.
Once a consumer is subscribed to a topic, sarama returns a PartitonConsumer. The PartitionConsumer processes Kafka messages from the given topic and partition.
The simplest way of using a PartitionConsumer is to loop over its messages channel using a for/range loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported as out of range by the brokers. In this case you should decide what you want to do (try a different offset, notify a human, etc) and handle it appropriately.
For subscriptions to multiple topics/partitions, leverage goroutines and channels to range over and retrieve each PartitionConsumers messages.
Like the producer, ensure you call AsyncClose() on the PartitionConsumer once you are done.
Test, Test, Test.
Sarama ships with mocks to be used for testing purposes.
producer = mocks.NewSyncProducer(t, nil)
All mock instances require that expectations are preset on them before they are used. This determines how the mock will behave. And if an expectation is not met, it will make your test fail.
You can also leverage dependency injection to achieve finer controller of each test case expectation. With this, extend default Sarama library functions with test dummy values that can be validated against. For example, the producer implementation can be redesigned with functions that allow hooks into different stages in the Kafka life cycle to carry out necessary tests.
Getting started with Sarama might seem overwhelming at first (at least it was for me). But underneath all the available configurations and options, it is basically, all about two components producers and consumers and of course, how they interact with each other. Here is a very simple working implementation of this interaction.