Angular + Spring Boot + Kafka: How to stream realtime data the reactive way.

David Maier
The Startup
Published in
9 min readNov 29, 2019

In this story I want to show how you can stream data from your Apache Kafka backend to an Angular 8 frontend in realtime, using websockets.

To fully utilize the power of Kafka and to boost performance and scalability, we will use webflux websockets to provide a websocket endpoint as well as reactor Kafka for consuming messages from our broker. Both packages are based on project reactor, a very robust library that implements the Reactive Streams specification, allows us to run non-blocking JVM applications and integrates seamlessly with Spring Boot.

On the client-side, we will use Angular 8 to get up a simple SPA quickly. To reduce boilerplate code for the websocket client and to manage state, we use the angular library NGXS as well as its websocket plugin.

To offload the hassle of getting up the Kafka cluster locally, I used a managed Kafka cluster from confluent for this demo. As we will see later, this choice requires some configuration so we can authenticate the Java Client. You can get a free account with 50$ of credit (which is MUCH more than you need for this example) at confluent.cloud

The full code for this tutorial can be found on GitHub:

Server: https://github.com/davemaier/reactivekafkaserver.git
Client: https://github.com/davemaier/reactivekafkaclient.git

What will we build?

To keep things focused, we will implement a very simple application that only displays a list of all messages from a Kafka topic. The list will automatically get updated if new data arrives in the Kafka topic (which is an important benefit of our reactive approach).

In a real application, the data in the topic would probably come from some microservice in the background. In our example, we will just use the CLI tool ccloud to produce sample data to the test topic, using the console.

Let’s start with the backend!

First of all, you need to create a new Spring Boot application. The easiest way to do this is to go to start.spring.io and use Spring Initializr. This webtool offers you an input mask where you can enter your project details and add dependencies.

For this example project, we use Maven as a build tool, Spring Boot 2.2.1 and Java 8. As dependencies select Lombok (I like using this to make declaring data classes less verbose), and Spring Reactive Web. The latter will include the Spring Boot webflux starter dependency in your project and will, therefore, prepare everything you need for using reactive websockets. Since the dependencies we need for the reactive Kafka client are not included in Spring Initializr you will have to include them in your pom.xml later.

You can now open the project in your preferred IDE. If you use Intellij IDEA this should work quite straight forward. Make sure to enable automatic maven project import so changes to your pom.xml will be also recognized by your IDE.

Define a class for data transfer

To make serialization of the data you send to the frontend easier, you should define a simple data POJO. The class contains a property type of type String which will be used to set the action type used by NGXS on the client-side, as well as a property message that contains the content of the Kafka message.

Using Lombok annotations makes the code cleaner since it adds getters and setter as well as methods like hashCode and toString at compile time. To fully understand what the annotations we use here do please visit the Lombok docs. Also, make sure that you’ve installed the Lombok plugin in your IDE so code completion and error checking will know about the methods that will be added at compile-time.

Create a websocket handler

For your webflux websocket endpoint, you need a component that implements the interface WebSocketHandler. This class implements the handle method which defines what happens when a client connects to your endpoint.

Notice that you get an instance of KafkaService here via dependency injection. This is the service that handles your connection to the Kafka cluster, I will show how to implement this part of the backend later. For now, it is important to state that the websocket session gets supplied with a flux, an unbounded data structure that can exist over time and emits data to a subscriber when it receives it.

Roughly said, we just lay a pipe from what the Kafka service receives directly to our client. The pipe is then part of the websocket session and will keep transporting messages from Kafka to our client. No implicit sending when messages arrive, no blocking -> all the annoying stuff is being handled by reactor.

Configure an URL as websocket endpoint

Next, you need to create a configuration class for your websocket. Just create a Java class that has the Configuration annotation and get an instance of WebSocketHandler via dependency injection. This will be an instance of our formerly implemented ReactiveWebSocketHandler class.

Then create a method, annotated as Spring Bean, returning a HandlerMapping. The HandlerMapping object we build in this method maps a URL to your ReactiveWebSocketHandler. The HandlerMapping can also be configured to handle CORS requests as you can see in the code example.

Finally, we need to define a Bean method returning a WebSocketHandlerAdapter instance. This just handles the process of connecting our handler to the underlying service. Since this step seems to be quite equal in most implementations there is a discussion on GitHub to include it in the webflux autoconfig in the future.

Include reactive Kafka dependencies

To use the reactive Kafka libraries, you need to manually import them into your pom.xml file.

Add your confluent cloud credentials

Let’s make importing your confluent cloud credentials easy. Just create a file called ccloud.properties and copy/paste your confluent cloud client configuration there. You can find this configuration in the confluent cloud web interface. Don’t forget to create an API key and set the correct values for public and secret keys in your properties file (you don’t need Schema Registry and Avro settings part for this tutorial).

Create a reactive Kafka client service

Let’s keep up with good coding practices and define an interface that describes our service.

Now you can implement this interface and set up a connection to your Kafka cluster.

In the constructor of our implementation, we first load the ccloud specific properties from the ccloud.properties file and then add some application-specific properties.

Now we can create a ReceiverOptions object based on the properties. We use the helper method createTopicCache to create a flux that represents the inflowing messages from a topic called testTopic.

Notice that we use the Flux.cache() method to create a cached hot flux from the initial flux we get from the reactive Kafka library. This allows us to cache all the messages in our topic in memory and pipe them to every single client that subscribes to our websocket endpoint.

In fact, we have just created the base for a very simple, stateful stream processing engine. We can now apply all sorts of operations on our cached flux, including filter and combine. This enables us to supply each client with a unique view on the topic data, while still staying reactive. We also gain the advantage of realtime updates to our clients without the need for any further logic. This is possible since the flux automatically pipes all new messages to our websocket handler which then pipes it to the client.

The frontend

Our websocket should now be up and running. To test it, we create a simple SPA based on Angular 8. I chose Angular 8 because it makes it easy to create a well-structured web application without a lot of knowledge about typical web development topics. There also exists a great library for state management in Angular called NGXS. Based on the very popular approach of the Redux state store, NGXS features a websocket plugin which integrates state management and websocket communication seamlessly.

Create an angular app

You need to have node.js and npm installed. If you haven’t installed the Angular CLI yet, you can do this via npm.

npm install -g @angular/cli

No just create a new angular app with the CLI.

ng new

You will be asked for your project details. Angular routing is not needed for this tutorial, but it makes development a lot easier if you want to continue with your project.

Prepare NGXS

Now you should install the required NGXS libraries and also its CLI.

npm install @ngxs/store
npm install @ngxs/websocket-plugin
npm i -g @ngxs/cli

Now you can create a new store using the CLI.

ngxs --name kafka --directory src/app/

Include your store and the websocket plugin into your app.module.ts

Now edit the generated store to represent our application state.

Here we just changed the name of the store variable items to messages to make the code more expressive. We also changed the name of the action used to AddMessageAction and added a Selector to retrieve the store variable messages.

Since we didn’t yet create the AddMessageAction class this code won’t work yet. Just go to kafka.actions.ts to add the new action.

export class AddMessageAction {
static readonly type = '[Kafka] Add message';
constructor(public message: string) { }
}

That’s it! The new NGXS state store will now be able to listen to incoming messages and will be updated in the background. All incoming messages from our websocket that contain a type key/value pair will be mapped to the corresponding action (the value of type must be set to the value of type in the action class and the rest of the incoming key/value pairs should match the parameters of the action).

Display incoming messages

The time has come to produce some output. To make things as easy as possible, we will use the existing app.component.ts to display the messages.

Here we used the Select annotation to get a variable from our state store. Notice that this is an Observable which means that it will keep up with state changes and always represents the current state.

We also created a constructor that takes a Store object. Angular dependency injection will supply an instance of our NGXS state store here. In the ngOnInit() function we can use this state store object to dispatch a message. By dispatching the ConnectWebSocket action, we tell the NGXS websocket plugin to establish a websocket connection with the configured endpoint.

Now delete all the content of app.component.html and type just this one line of code.

{{ kafkaMessages$ | async }}

This will fully utilize the power of angulars support for Observables by using the async pipe. The Observable kafkaMessages$ will be subscribed to in the background and changes will automatically be displayed in the browser.

Try it out!

To run your stack, first, start your backend. In the root directory of your project run mvn spring-boot:run or configure your IDE to run it.

Next, go to your frontends root directory and run ng serve to get up a development server for your angular app.

Since Kafka is already running in the cloud, you just need to produce messages to it using the command line. I recommend using the tool confluent cloud CLI which you can get from here: https://docs.confluent.io/5.2.0/cloud/cli/install.html

To produce messages you first need to login

ccloud login

Then you need to store an api-key and tell ccloud to use it, you can use the key you generated for your backend.

ccloud api-key store <apikey> <secret>
ccloud api-key use <apikey>

Now you can create a test topic.

ccloud kafka topic create testTopic

And start producing messages to this topic.

ccloud kafka topic produce testTopic

Everything you type now (and send by pressing ENTER) will be immediately displayed at localhost:4200

Conclusion

Well, that was easy! Ok, to be honest, it took me quite a while to figure out how to get the full stack reactive and especially the Kafka part gave me some headache.

There are still some things that need some thought if you want to use this for a production-ready app. Probably the most important shortcoming of our solution is that we have 6 partitions in our topic. If our consumer group only contains one consumer we, of course, consume all the topics data. If we want to scale out and add server instances, there will be more consumers and the data will be spread. So in the next step, we might need to search for a solution that redirects every request to the server instance that contains the required data.

Thanks for reading, I’m happy for any comment!

--

--