Event sourcing with kafka and nodejs

Manuel Villing
6 min readMay 8, 2018

--

Objectives

The purpose of this article is implement a basic event sourcing app, with basic fault tolerance and clear guarantees (at-least-once, timeouts). There are other ready to go solutions out there for example: node-cqrs-domain, nestjs-cqrs, wolkenkit, but i decided to write something quick from scratch for learning purposes.

I assume that you know the basics of Event Sourcing and CQRS if not follow the links and make yourself a picture of what i will be talking about.

Thats the high level view of what we will be building in this tutorial. It is basically an app which maintains an aggregate, which can be updated by commands via an CLI app through an express api. Workload can be scaled on N consumers, where N is only limited by the numbers of Partitions you create for the given Topic.

After the tutorial you will be able to understand how the different components of the app interact with kafka. I will not going to preview source code instead i will talk about the mainly about the design of an kafka + nodejs app.

Structure of the article

  1. Kafka
  2. Example Application
  3. Implementation details
  4. Possible improvements

1. Kafka

In the todays world where applications get more and more complex and where applications are split up into micro-services, kafka can be used as the backbone of your application. Use cases are for example: message-passing (event-bus, queue), storage system and stream processing. The latter one recently gaining importance due to demand for real-time-data for business insights as well as just to improve the user-experience.

In the following section i will highlight some keywords you need to know to get around with kafka.

  • consumer — reads events from a partition and processes them, then commits its offset
  • producer — writes events to a partition of a topic, the partition is chosen by the partitioner while enforcing write concerns
  • replication factor — on how many nodes the data needs to be replicated to prevent data loss in case a nodes hard-drive fails
  • retention period — how long events will be stored in a topic (1week, forever…)
  • topics — can have many partitions, replication factor, retention period, partitions can be configured on topic basis
  • partitions — append only sequence of stored events written by the producer with an offset
  • write concerns — does they write need to be acknowledged or not. By how many nodes it need to be acknowledged
  • partitioner — decides into which partition a event goes, they partition is derived by the key you provide with the event you write to a topic
  • latest offset — the position in the partition which was consumed + handled, offsets higher then the latest offsets art still unconsumed / unhandled
  • consumer-groups — consumes one or more topics and balances the partitions equally amongst the consumers belonging to this consumer group

2. Example Application

2.1 Description

The main purpose of the Example Application is to allow one person to create job requests which can be filled by referenced persons of the corresponding job request. After a job request is accepted by a referenced person it cannot be accepted by another person. Allowing a job to be accepted by 2 persons would be a nightmare, the goal is to design an app which avoids this case and allows us to generate read-views from the aggregate state.

Example App

2.2 Source Code

2.3 Technologies used

To make the example application work i used following packages and technologies.

  • docker / landoop/fast-data-dev — provides an development env around kafka with an UI to inspect kafka
  • docker / redis — used for its pubsub capabilities
  • docker / mongo — used to store the aggregate
  • npm / fastest-validator — to enforce the schema of events
  • npm / kafka-node — kafka client for nodejs
  • npm / express — for this api
  • … some more

2.4 Flow

This part describes how all the parts work together to make the whole app work.

  1. The CLI is used to make http requests agains the API with a unique request id, the CLI client allows basic resending of request in case that they fail because of a timeout, instead of talking in via an http endpoint (SYNC communication) we could also expose an web-socket server from the api to allow (ASYNC communication)
  2. The API validates the payload and writes an event to the requests topic in kafka, at the same time it subscribes to the redis pub-sub to wait for a response from the consumer to respond the http request of the CLI.

3. The consumer does following:

4. The events in the finalevents topic can be used to generate read-views by another consumer / consumer-group

For more clues check out the source code and make your own picture.

3. Implementation details

  • aggregate state is stored in mongo, we could also rebuild the aggregate state from kafka but that can be slow because we need to read all events of a partition and filter those events for a particular aggragateId
  • we have a one node kafka cluster
  • at least once-semantic — it can always happen that events appear double in kafka do to an error, application logic needs to handle those cases
  • deduplication — a client can expect any request to timeout due to slow consumer or congestion. In case of a timeout it can be that the consumer processed the request already, if they client would retry the request without the request id it used for the previous request he would end up creating a second aggregate maybe, to avoid that requests are deduplicated
  • retention period is 24h, thats ok we dont use kafka to store events indefinitely
  • timeouts, we give the client definite feedback within a certain timeframe (either SUCCESS, TIMEOUT, FAIL)
  • adding consumers / scaling is easy, just make sure you added enough partitions, its recommended to calculate the partition count with following formula => (expected consumer count now) * 10 = partitions to create

4. Possible improvements

  • arvo serialiser / un-serialiser for message schemas in kafka
  • #noimplemente, add consumer which generates multiple read-views from the events in the “finalevents” topic
  • we have flow control in the consumer (pause / unpause to avoid overflow)
  • add write concerns to mongo client to make sure the data is written and cannot be lost
  • prevent ingest overflow (api should not write too many messages to kafka in case consumers are down) instead it should throw 503 errors
  • jobrequestId -> should be named aggragateId

Update 17.05.2018

Today i noticed nodefluent is implementing some nice kafka tools for the nodejs ecosystem, for example an kafka-client with natively supports observables https://github.com/nodefluent/node-sinek, definitely worth checking out!

Update 15.10.2018

I wrote another article about Kafka which features “Tracking transactions on the blockchain with nodejs and kafka

Update 30.12.2018

I found a new kafka client for nodejs which seems very promising to me. If you have trouble with kafka-node then you could try: https://github.com/tulios/kafkajs

Update 10.01.2019

I found a small framework for the event sourcing pattern called “sourced”, it does not integrate with kafka yet, but its definitely useful to play with the event sourcing pattern.

Thanks for reading.

How is your experience with nodejs and kafka?

--

--

Manuel Villing

#freelance #webdeveloper #nodejs #onlinemedienmanagment #javascript #design