Build a collaborative web application using reactive programming

Alexandre Jacquot
Feb 24 · 16 min read

How to build a reactive web application with Angular, Spring Boot, WebFlux and MongoDB

Have you ever wondered how to build a collaborative web application where data modifications are shown to users in real-time? Are you interested in designing responsive and resilient services? Or just simply, are you confined to the house and looking for something interesting to read? Search no more, you are in the right place.

In this tutorial we will look at how to create a collaborative web application using reactive programming. We will build it with Angular, Spring Boot, Spring WebFlux, and MongoDB. Finally, to deal with concurrent modifications, we will use optimistic locking, Change Streams and Server Sent Events.

The source code of the application can be found here on GitHub.

Blocking systems vs Non-Blocking systems

Before jumping into action, it is important to understand the difference between blocking and non-blocking systems. So, first, let me tell you a story.

This story illustrates the advantages of building non-blocking systems. Read it again, but now consider that:

  • the company is a web-service
  • the customer is an API consumer
  • the order is a web service call
  • the items are web resources
  • the employees are threads
  • the warehouse is a blocking resource like, for instance, a database

In web development, applications are composed of several parts interacting together. Some of these interactions are blocking by nature, like those involving databases. In a blocking system, when a consumer calls an API to get data, the call blocks the thread until all selected elements are retrieved from the database and returned to the consumer. While the query performs, the thread is blocked, and this is literally a waste of resources. Eventually, when the data is returned to the consumer, the thread is put back into the pool and made available for handling another request. In a non-blocking system, reading data from the database does not block the thread. Each time a record is fetched, an event is published. Any thread can handle the event and send the record to the consumer without having to wait for the other records to be fetched.

Chapter 1: Build a reactive web application

In the spring documentation, reactive programming is defined this way:

The reactive manifesto adds that, to be reactive, a system must be:

  • Responsive: The system focuses on providing rapid and consistent response times.
  • Resilient: The system stays responsive in the face of failure.
  • Elastic: The system stays responsive under varying workload.
  • Message-driven: The system relies on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation and location transparency.

In this chapter, we will see how to build a non-blocking web application using reactive programming.

The collaborative web application

The application we are building is a collaborative web application where users can work together on a shared To Do list.

Users must be able to:

  • add an item to the shared list
  • edit an item's description
  • change the status of an item
  • delete an item

Here is an overview of the application.

The To Do list Web application

MongoDB

In order to build a reactive application, we need to use a database where data is accessible in a non-blocking way, but for our use case, this won’t be enough. As our target is to make a collaborative application, we should select a database allowing us to listen to all data changes in real time.

For this tutorial we will use MongoDB, but other databases could have fit our needs, including relational databases like PostgresSQL or MS SQL/Server (see R2DBC for more details about reactive programming with RDBMS).

To setup a running instance of MongoDB we will use Docker Compose. The first thing you need to do is to create the following yaml file.

As you can see, this will also deploy an instance of Mongo-Express, a web client for MongoDB.

To start the environment, execute the following command in same folder as the yaml file:

$ docker-compose up

Similarly, to stop the environment, execute:

$ docker-compose down

Now, you have :

Mongo Express - A web client for MongoDB

Spring Boot

The Spring Boot application can be initialized using Spring Initializr.

You need to select the following dependencies:

  • Spring Reactive Web (aka Spring WebFlux)
  • Spring Data Reactive MongoDB
  • Embedded MongoDB Database (optional, for testing purposes)
  • Lombok (optional but reduces the boilerplate code)

The resulting pom.xml should have at least the two following dependencies:

The data model

The list items have the following properties:

  • a unique identifier, generated by MongoDB
  • a description which is a simple free text
  • a status among the following values: TODO, IN_PROGRESS, DONE
  • a creation date, generated by MongoDB
  • the last modification date, generated by MongoDB

The repository

In order to populate the audit timestamps automatically, you need to add the @EnableReactiveMongoAuditing annotation on top of the Spring Boot application main class.

The URI of the database must also be configured by adding the property spring.data.mongodb.uri to the Spring Boot configuration file.

The REST API

The communication between the front-end and the back-end will be done using a REST API. Here is an overview of the endpoints we are going to implement.

The REST API using Swagger UI

To avoid exposing our model in our REST API but also to have a clear contract, we need to create some resource classes.

Create a new item (POST)

Creating a new item consists only in sending a description. The status will be defaulted to TODO.

Update an existing item (PUT and PATCH)

To update an item, we have two possibilities.

The first solution is to send the complete JSON representation of the item by calling the PUT endpoint with the following resource.

The second solution is to send a JSON document containing only the required changes. This can be achieved by calling the PATCH endpoint with the following resource.

The PATCH endpoint is implemented according to RFC-7396.

This RFC states that if a field is present in the resource, the field will be updated. On the contrary, if a field is missing it will be ignored and the actual value in the database will be kept.

Thanks to the JSON deserializer and to the use of Optional:

  • a missing field in the JSON document will result in a null Optional
  • a field present in the JSON document but having a null value will result in an empty Optional
  • a field present in the JSON document but with a non-null value will result in a non-empty Optional

For example, to set an item to done, we have to send a JSON containing the status field and we can omit the description.

Delete an item (DELETE)

The deletion of an item doesn’t require a resource. We just need to send the identifier of the item as a path parameter.

Get the items (GET)

The GET endpoints will return the following resource.

The REST controller

The controller delegates all calls to the service layer by calling methods of the ItemService.

The interesting part here is the use of Flux and Mono.

  • A Flux is an asynchronous sequence of 0 to N objects.
  • A Mono is an asynchronous 0 to 1 result.

For more information about Flux and Mono, I suggest you have a look to the reactor documentation.

The service

The service needs a reference to:

  • the reactive mongo repository
  • a mapper component responsible of mapping the REST resources with our data model.

For mapping the objects we are using MapStruct, a code generator used to simplify the implementation based on configuration.

Angular

This article won’t enter into the details of building an Angular application. We will mainly focus on the service used to interact with the back-end using the REST API.

To call the back-end endpoints, we are using RxJS.

With RxJS, there is no coding difference between calling a blocking or a non-blocking endpoint if only one resource is involved.

When it comes to getting a list of resources by calling the GET endpoint, the use of RxJS is a bit different. If you remember what we said earlier, in a blocking system, the server sends all the data at once, and the consumer is only notified once with the complete list of elements. In reactive programming, each resource is sent to the consumer at the time it is ready. It requires opening a communication channel with the server and this is where Server Sent Events (SSE) comes into the picture.

SSE is an HTTP standard which is often compared with WebSockets. Whereas WebSockets offer a bi-directional communication channel over a single TCP connection, SSE offers a mono-directional communication channel over simple HTTP (server to client). I am sure that you can find plenty of great articles comparing SSE with WebSockets. The decision of whether you need to use one or the other probably depends on the application you are developing. For our use-case, SSE better fits our needs as we don’t really need a real-time upstream (client to server).

In the ItemService, the communication channel is represented by the EventSource instance. When an item is received, the onmessage event handler is called. The only thing this handler does is to fire an observable event containing the item. The UI component responsible of displaying the items must subscribe to the Observable returned by the findAll() method and display the arriving items. An important part here is the use of NgZone to publish the items. This is needed to tell Angular to re-render the UI as, by default, asynchronous tasks don’t require UI updates. Without this, the screen wouldn’t be rendered correctly.

In case of error, or when the back-end has finished sending all the items, the onerror event handler is called. If the connection has been closed, the readystate property of the EventSource is equal to 0. When this happens, we can safely close the EventSource and complete the Subscriber.

The image below shows the reactive programming in action. By adding a delay of 1 second between each message, you can see the items being rendered at the time they arrive. Given a total of 6 elements, in a blocking system, we would have had to wait 6 seconds before seeing the items.

Items sent with a delay of 1 second between each message

The combination of both, WebFlux and RxJS makes our application fully reactive. The web resources are sent asynchronously by the server and processed asynchronously by the client.

In the next chapter we will start converting our application into a collaborative application.

Chapter 2: Prevent conflicts using Optimistic locking

In a collaborative application we must prevent users from overwriting each other's data. The application we have developed so far does not prevent users losing their changes or overwriting the changes done by other users.

Two users overwriting each other's changes

In order to avoid conflicts between users, we are going to implement an optimistic locking mechanism.

Optimistic locking consists of preventing concurrent modifications by ensuring that the object we are going to save has not been updated since the last time we read it. In other words, when reading an item, we should take note of its version and when saving it, we should ensure that the version hasn’t changed.

To implement optimistic locking, you need to add a version number to the Item class.

Whenever an item gets updated, its version is automatically increased by one.

The version needs to be sent to the consumer in the ItemResource.

When updating, patching or deleting an item, the client must send this version back to the service. To do so, we use the If-Match request header.

The version is provided to the controller which passes it to the methods of the ItemService.

In the service, the item is retrieved from the database and its version is compared with the one provided by the client.

If the versions are different, a conflict is detected and the request is rejected with the creation of an UnexpectedItemVersionException. Note that this exception is not thrown directly. While this would work, thanks to the reactor implementation, it is considered as a bad practice and is contrary to the reactive specification. The specification states that the only legal way to signal a failure to a subscriber is via the onError method. So, instead of throwing the exception, we use the handle operator and signal the failure using the provided SynchronousSink instance.

If the versions are identical, the service can perform some business logic and in the end, call the save method of the Mongo repository. At that time the repository will check once more the version of the item. If it has changed, an OptimisticLockException is thrown and the request is rejected. If the versions are identical, the item is saved.

Thanks to optimistic locking, users can’t overwrite anymore each other's changes. The following image shows how the system behaves when concurrent modifications are detected.

Concurrent modifications are properly handled

Chapter 3: Push all data changes to the front-end using Change Streams and Server Sent Events

In the previous chapter, we implemented optimistic locking. This is a first step for making a collaborative application but this is not enough. Now, the users will see an error message when a conflict is detected. This is safe but not really user-friendly. The last step to make our collaborative application is to show all data modifications to users in real-time.

Change Streams

MongoDB keeps a rolling record of all operations that modify the data stored in the database. The operations are stored in a capped collection called oplog (operation log).

To access these operations, MongoDB provides a feature called Change Streams.

Before jumping into the code, we need to change our Docker Compose yaml file to create a MongoDB replica as this is a prerequisite for using Change Streams. Without a replica set, using Change Streams would result in the following error.

To avoid having to run several instances of MongoDB locally, we need to create our own docker image building a single node replica set. For this, we have to create the following DockerFile in a mongo folder.

Once done, we have to update the Docker Compose yaml file and restart the environment.

Now that we have a running replica set, we can use the MongoDB Change Streams. The code below demonstrates how to listen to all changes performed on the Item collection.

As you can see, the listenToEvents method doesn’t return a Flux of ChangeStreamEvent but a Flux of Event.

The Event interface is used to group the following event classes:

  • ItemSaved, when an item is created or updated
  • ItemDeleted, when an item is deleted

The mapping between ChangeStreamEvent and the event classes is done in the ItemMapper.

The instances of ChangeStreamEvent generated for the DELETE operations don’t have a body containing the deleted document. This makes the code to extract the identifier of the deleted items a bit tricky.

Server Sent Events

To expose the events, we must add a new endpoint.

Although the method could return a Flux of Event, here, we return a Flux of ServerSentEvent. This class is used to wrap the events and it comes with some advantages.

First, we can add some information about the events. We can specify:

  • a unique identifier which could be useful in case we need to reprocess some events
  • a name, which is useful to identify the actual type of the event
  • a comment, not really useful in my opinion

All these fields are optional. In our case, we only need to set the event’s name, represented by its class name, and the actual data of the event. Setting the event's name will allow us to dispatch the events to the appropriate handlers in the front-end.

Another benefit of using the ServerSentEvent class is that it makes optional the declaration of the MediaType text/event-stream. If you don’t use this wrapper class, it is mandatory to specify it.

The last advantage of the SSE is the possibility to specify a retry duration. When set, the client will be informed about the time to wait between two connection retries in case the connection gets interrupted. SSE comes with an automatic reconnection mechanism, something not provided out of the box by WebSockets.

Now that we have added the new endpoint, we need to modify the Angular service to add a method responsible of receiving and dispatching the item events.

This method takes two callback functions as parameters. The first one will be called when an item is saved, the second one, when an item is deleted.

For these functions to be called, we need to register two event listeners by calling the addEventListener method of the EventSource. The first parameter of this method is the name of the event. If you remember the code of the controller, we have used the class name of the event (ItemSaved and ItemDeleted). The second parameter is the corresponding callback function responsible for handling the event and changing the view accordingly.

Something important to keep in mind is that the SSE connection must always be open as we are listening to all data changes. In case the connection gets interrupted, we should not close the event source as this would prevent the automatic reconnection. When the connection comes back up, the data set stored on the client side could be deprecated. In this situation, it is important to refresh the complete list of items or to reprocess the missed events, something not done here. If you are interested in reprocessing the missed events, I recommend you to have a look to the resumeAfter function provided by the MongoDB Change Stream.

Now, our application is a real collaborative application.

Change Streams and SSE in action with two users

Thanks to Change Streams, any update directly done in MongoDB is immediately visible to users.

Updates in MongoDB are immediately visible to users

That's awesome! We did it!

My enthusiastic colleagues at Pictet Technologies

Important notes about SSE

Depending on the infrastructure on which your Web Service is deployed, the SSE connection could be interrupted if no traffic is detected during a certain amount of time. This will result in a timeout and the connection will be lost. To deal with this problem you can simply send some HeartBeats every x seconds to the client. HeartBeat is a custom class containing only a unique identifier automatically generated. The identifier is not really useful but the objects sent using SSE can't be empty.

The second important thing to know about SSE is that all browsers have a limitation of 4 to 6 active HTTP/1 connections. This could be a problem for users opening several tabs. However, there exist techniques to prevent this problem (close/open the connection based on the active tab, share the connection between tabs, etc.).

For instance to close/open the connection based on the active tab, you can use the following listeners.

@HostListener('window:focus', ['$event'])
onFocus(event: any): void {
// The tab has been activated
// => refresh all data and listen to all events
...
}
@HostListener('window:blur', ['$event'])
onBlur(event: any): void {
// The tab is no more active
// => close the event source
...
}

If this limitation becomes a real problem, you should consider using WebSockets instead.

About me

Technical Lead at Pictet Technologies, I am passionate about software development. I have been building IT solutions for more than 20 years for various industries like finance, air cargo, government, banking, insurance and retail. I enjoy learning new things and my goal is to apply my skills and expertise in a team with the aim of delivering the best IT solutions. My super power? I saved the World in 1999 by contributing to fix the Y2K bug… in Cobol.

Pictet Technologies Blog

Sharing our passion about modern software development, technologies and everything else around it.

Pictet Technologies Blog

We are a group of software engineering intrapreneurs driven by the passion to continuously deliver value to our users within the Pictet Group. We strive to improve ourselves and our ways of working on a regular basis, to achieve perfection and excellence.

Alexandre Jacquot

Written by

Technical Lead at Pictet Technologies

Pictet Technologies Blog

We are a group of software engineering intrapreneurs driven by the passion to continuously deliver value to our users within the Pictet Group. We strive to improve ourselves and our ways of working on a regular basis, to achieve perfection and excellence.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store