Reactive programming with Spring Data R2DBC

Alexandre Jacquot
Pictet Technologies Blog
14 min readJul 8, 2021

--

How to build a collaborative web application with Angular, Spring Boot, Spring WebFlux, Spring Data R2DBC and PostgreSQL

Illustration by Luke H

In my last article, we addressed the concept of reactive programming and we put it into practice by creating a collaborative web application using Angular, Spring Boot, Spring WebFlux and MongoDB. If you haven't read it yet, I recommend that you do so, as it will ease the understanding of this article.

In the application we built, we only used one type of domain object. Therefore, all operations dealing with the data access layer were processing only one single document at a time. This was an ideal use case for reactive programming but what would happen if we introduced relationships between objects? Building an object and having to fetch its related objects before being able to send it back does not seem compliant with the Reactive Streams specification. Also, how can we properly handle transactions when the data is saved by multiple threads? Will reactive programming still be possible or will it be necessary to revert to imperative programming?

In this article, we will try to answer all these questions. We will update our collaborative web application and use a relational database. We will build it with Angular, Spring Boot, Spring WebFlux, Spring Data R2DBC and PostgreSQL. To deal with concurrent modifications we will use optimistic locking, Server Sent Events and the PostgreSQL notification system.

The source code of the application can be found here on GitHub.

R2DBC

For those who wonder, no, R2DBC is not a Star Wars droid. It stands for Reactive Relational Database Connectivity. The official documentation states the following :

Based on the Reactive Streams specification. R2DBC is founded on the Reactive Streams specification, which provides a fully-reactive non-blocking API.

Works with relational databases. In contrast to the blocking nature of JDBC, R2DBC allows you to work with SQL databases using a reactive API.

Supports scalable solutions. With Reactive Streams, R2DBC enables you to move from the classic “one thread per connection” model to a more powerful and scalable approach.

Provides an open specification. R2DBC is an open specification and establishes a Service Provider Interface (SPI) for driver vendors to implement and clients to consume.

At the time this article is written, several drivers are available but, some of them are not yet ready for production. Before deciding to use one of them, please, read its documentation carefully.

  • cloud-spanner-r2dbc - driver for Google Cloud Spanner.
  • jasync-sql - R2DBC wrapper for Java & Kotlin Async Database Driver for MySQL and PostgreSQL (written in Kotlin).
  • oracle-r2dbc - native driver implemented for Oracle.
  • r2dbc-h2 - native driver implemented for H2 as a test database.
  • r2dbc-mariadb - native driver implemented for MariaDB.
  • r2dbc-mssql - native driver implemented for Microsoft SQL Server.
  • r2dbc-mysql - native driver implemented for MySQL.
  • r2dbc-postgres - native driver implemented for PostgreSQL.

Chapter 1 : Setup a PostgreSQL database

To store our data, we will use PostgreSQL. Our first step consists of setting up a local instance of PostgreSQL using Docker Compose. To do so, we need to create the following yaml file.

As you can see, this will also deploy an instance of PgAdmin, a web client for PostgreSQL.

To start the environment, execute the following command in same folder as the yaml file:

$ docker-compose up

Similarly, to stop the environment, execute:

$ docker-compose down

Now, you have :

PgAdmin Configuration — All passwords are set to ‘password’ but, please, don’t tell anyone 🙊

Chapter 2 : Build a reactive web application with Spring Data R2DBC

The business case

To demonstrate how to deal with relationships in reactive programming, we need to enrich our data model. In the previous version of our application, users were able to work together on a shared To Do List. They could perform the following actions:

  • add an item to the shared list
  • edit an item’s description
  • change the status of an item
  • delete an item
The first version of our To Do List application (without relationships)

In the new version we are going to develop, they will also be able to edit the items in order to:

  • define an assignee (optional)
  • define a set of tags like Private, Sport, Work, etc. (0 to n elements)

Here is an overview of our new application:

Spring Boot

The Spring Boot application can be initialised using Spring Initializr.

You need to select the following dependencies:

  • Spring Reactive Web (aka Spring WebFlux)
  • Spring Data R2DBC
  • Liquibase Migration (needed to create the database schema)
  • PostgreSQL driver (a JDBC and R2DBC driver used to connect to PostgreSQL. JDBC is required by liquibase)
  • Lombok (optional but reduces the boilerplate code)

The resulting pom.xml should have at least the following dependencies:

The Data Model

The figure below depicts the new data model:

Let’s take a look at the Item class.

The first thing to notice is the use of the @Table annotation. Although it is not mandatory to annotate our domain objects, not doing it would impact performance. Actually, this annotation is used by the mapping framework to pre-process the domain objects in order to extract the meta-data needed to interact with the database.

Another important annotation is @Id. It is used to map a class field to the table's primary key. Note that, with Spring Data R2DBC, there is no automatic generation of unique identifiers. We can't even specify a generation strategy. To solve this problem, we can simply tell PostgreSQL to automatically generate the ID when a record is created by using the SERIAL datatype.

Moreover, Spring Data R2DBC doesn't support embedded ids. In other words, we can't define a composite primary key. This limitation is pretty annoying because it forces us to use a generated unique ID instead and this has a direct impact on the amount of code we need to write. You will see this impact in more detail when we save some records in the ITEM_TAG table. This table, used to represent the n..n relationship between items and tags, has a technical key whereas we could have used the item ID and the tag ID as composite primary key.

The @Version annotation is fully supported and it comes with an optimistic locking mechanism. Each time a record is about to be saved, the actual version of the record is compared to the provided one and, if they are identical, the version is incremented and the record is saved. If they are different, the record is not saved and an error is returned.

The audit annotations like @CreatedDateor @LastModifiedDateare also supported. To enable the auditing feature, we need to declare it explicitly with the @EnableR2dbcAuditing annotation. We can, for instance, add it on top of the application main class.

A very important thing to be aware of before deciding to use Spring Data R2DBC, is the lack of support for relationships. Unlike Spring Data JPA, it is not possible to use an advanced ORM framework like Hibernate. Spring Data R2DBC is a simple and limited object mapper. As a consequence, many features usually provided by ORM frameworks are not available, like, for instance caching or lazy loading. Because the related objects can’t be mapped automatically, the fields assignee and tags must be annotated with @Transientto tell the mapping framework to ignore them. In the next section, we will see how to map these objects.

Last but not least, the database schema can’t be created automatically based on the domain objects. To overcome this problem, we can use Liquibase to create and maintain our schema. As you have access to the GitHub repository, I am not going to put the complete Liquibase change log file here. Nevertheless, here is how to create the ITEM table.

We also need to configure the following application properties to allow Liquibase to connect to the database using JDBC.

The CRUD API

The communication between the front-end and the back-end will be done using a REST API. Here is an overview of the endpoints we are going to implement.

Swagger UI — The REST API

Repositories

Spring Data R2DBC allows us to create repositories by creating interfaces extending the ReactiveCrudRepository interface. You can also extend the ReactiveSortingRepository interface, an extension of ReactiveCrudRepository, which provides additional methods for sorting entities.

Here are examples of repositories.

The above example shows the creation of basic queries using Query Methods. Behind the scenes, the framework generates SQL queries directly from method names which have to follow a strict naming convention. For more information about the supported keywords, you can have a look to the documentation.

When your query is too complex to be generated with Query Methods, or when you have to deal with relationships, you have to write the query yourself. Unfortunately, JPQL is not supported and we need to write plain SQL queries. This has a direct impact on coding effort and on maintainability.

To allow our application to connect to the database, we need to configure the following application properties.

For more information about repositories, you can have a look to the Spring Data R2DBC documentation.

Creating an item

Creating a new item consists of sending a description, an optional person identifier (the assignee) and an optional set of tag identifiers.

The REST controller delegates the item creation to the ItemService and returns a Mono<ResponseEntity>. Although it’s not mandatory to return a ResponseEntity, it allows us to alter the HTTP status code and to add response headers. In our example, when an item is successfully created, we return a 201 HTTP status code (created) and add the location response header using HATEOAS.

Before calling the create method of the ItemService, the controller calls the ItemMapper in order to map our contract to an instance of the Item entity. Like in the previous version of the web application, the mapping is done using MapStruct. We won’t enter the details of the ItemMapper, but what you have to know is that the related person and tags are not loaded from the database at this stage. Only the identifiers are set to the entity.

For example, this NewItemResource instance:

{ 
description: "Read the article",
assignee: 10,
tagIds: [1,2]
}

is mapped to the following Item instance:

{ 
id: null,
version: null,
description: "Read the article",
status: "TODO",
assigneeId: 10,
assignee: null,
tags: [ { id: 1,
version: null,
name: null,
createdDate: null,
lastModifiedDate: null
},
{ id: 2,
version: null,
name: null,
createdDate: null,
lastModifiedDate: null
}
],
createdDate: null,
lastModifiedDate: null
}

The service delegates the creation of the item to the ItemRepository. This creates the item and the link to the assignee. To link the item with the tags, we need to create instances of the ItemTag class, each instance representing a link to a tag. To persist these links, we have to call the saveAll(Collection<ItemTag>) method of the ItemTagRepository. This method returns a Flux<ItemTag> which has to be converted to a Mono<List<ItemTag>> by calling the collectList() method. Converting the Flux to Mono allows us to return the newly created item to the controller, only when all the links are saved.

An important thing to note is the use of the @Transactional annotation. It is used to initiate a transaction and to rollback or commit the performed changes depending on whether an error occurred. In imperative programming, the transactional state is associated with an execution using the ThreadLocal storage. In reactive programming, it is not possible to bind the transactional state to only one thread because multiple threads can be involved. As a consequence, ThreadLocal cannot be used. Instead, the transaction management system needs to use the Reactor Context. This Context is defined per Subscriber and is used by several features like tracing, security, scheduling, etc.

For more details about reactive transactions with Spring, you can have a look to the following blog.

Reading all the items

The controller delegates the search of the Item entities to the service and maps them to our API contract. As we are returning a Flux<ItemResource>, we need to specify the text/event-stream media type.

The ItemResource contains the information about the item but also the related elements, namely the assignee and the tags.

The ItemService calls the findAll(Sort) method of the ItemRepository and for each of the found elements, calls the loadRelations(Item) method.

The loadRelations(Item)method aims to load the objects related to the item. To do so, we need to use the zipWith(Mono) operator. This operator zips two Mono together and returns a Mono<Tuple2>. Using this operator, we can load the item, load the tags and set them to the found item.

When using zipWith, it’s really important to be aware of the following rule:

An error or empty completion of any source will cause the other source to be cancelled and the resulting Mono to immediately error or complete, respectively.

In other words, if any of the sources returns an error or a Mono having a null value, Void, or an empty Optional, the subscription will end.

This is the reason why we need to ensure that the assignee identifier is not null before zipping the item with the person.

Reading an item

The controller delegates the search to the service.

The method is quite similar to the one we did in the previous version of our application. It takes a version as optional parameter and ensures that the item has not been updated by comparing the provided version with the actual one. The difference lies in the loading of the related elements. The method accepts a boolean which states whether the related elements have to be loaded.

Updating an item

To update an item, we have to send the following data.

The controller finds the item, maps the modifications to the found item and calls the service in order to perform the update.

Updating an item consists of :

  • updating the record in the ITEM table
  • inserting records in the ITEM_TAG table for the tags newly associated with the item
  • deleting records from the ITEM_TAG table for the tags no longer associated with the item

The lack of support for embedded ids forces us to compare the list of currently associated tags with the new list provided by the API consumer. If embedded ids were supported, the primary key of the ITEM_TAG table would have been composed of the item ID with the tag ID. In this situation, we would have been able to remove all existing associations and add the provided ones without having to compute any delta. As embedded ids are not supported, the ITEM_TAG table has a generated technical key. We could still delete all links and insert new ones but this would generate new primary keys and, in case you have an auditing system, the generated operations and timestamps would be wrong.

Patching an item

To patch an item, we use the following contract:

Patching an item is almost identical to updating an item. The difference results in mapping the changes to the Item entity. When finding the item, the controller must set the loadRelations boolean to true in order to find all the related objects. This is needed because the patch might not contain all the fields of the item.

Deleting an item

To delete an item, we need first to remove the links between the item and the tags. Once done, we can delete the item.

As we are using the zipWith operator, it is really important that the method of the repository does not return a Mono<Void> but a Mono<Integer>. Spring repositories support both return types. When you specify Mono<Integer>, spring returns the number of deleted elements. Although this information is not really useful, keep in mind that the result can’t be empty otherwise the subscription will be cancelled and nothing will be deleted.

Chapter 3 : Listen to all data changes in real time

This chapter is not really related to Spring Data R2DBC but, as we are developing a collaborative web application, I think it is important to explain how to listen to all data changes with PostgreSQL.

In the previous version of our application, we used MongoDB Change Streams to detect all data changes. With PostgreSQL, we will use a notification system relying on two commands, namely Listen and Notify.

The NOTIFY command sends a notification event together with an optional payload string to each client application that has previously executed LISTEN channel for the specified channel name in the current database.

Notify

To send notifications each time an item is saved, we have to create the following trigger with its associated function.

The trigger calls the notify_item_saved() function which sends a notification on a topic called ITEM_SAVED. This notification contains the updated item serialized to JSON.

It’s important to know that the payload sent along with the notification must be shorter than 8000 bytes. If you need to send a bigger payload, you should consider saving the record in a dedicated table and sending the key of this record.

Similarly, to notify all deletions, we need to create the following trigger with its associated function.

Let’s test the notifications using PgAdmin.

Listen

In order to listen to all data changes, we need to create a NotificationService. This service needs to have a permanent connection to the database. This connection is created using a ConnectionFactory and is closed when the service gets destroyed. The service provides two public methods, one to listen to a topic and one to ‘unlisten’ from a topic. To keep track of the topics, the service holds a set of listened topics.

To listen to a topic, we need to call the listenNotification(NotificationTopic, Class) method. The first argument corresponds to the name of the topic we want to listen to.

The second argument is the class of the payload and it is used to deserialize the payload of the received notifications.

The controller exposes an endpoint to listen to all changes. Like in our previous application, it returns a Flux<ServerSentEvent<Event>>.

The ItemService is responsible for calling the NotificationService for the ITEM_SAVED and for the ITEM_DELETED topics and for returning the results to the controller.

Our collaborative web application is now working.

The collaborative app with two concurrent users

Conclusion

In this article, we have seen how to build a reactive web application using Spring Data R2DBC and how to listen to all data changes in real time using PostgreSQL.

Implementing the reactive API was not an easy path. Although Spring Data R2DBC supports repositories and transactions, the lack of some important features like the management of relationships, the embedded ids, or the database schema generation, slows down its adoption.

Don’t get me wrong, Spring Data R2DBC is great but, in its current state, it’s maybe not the best fit for your application. Nevertheless, I am convinced that it can be used in some situations. You can, for instance, consider using imperative programming for all operations modifying data and use Spring Data R2DBC to access JSON documents already prepared in advance with all the needed information. Such an approach sounds quite similar to what you would do when applying the CQRS pattern.

Now, if you are interested in using a fully functional ORM, I suggest that you have a look at Hibernate Reactive which is intended to be used with reactive environments like Quarkus and Vert.x. However, be aware that it is still under development.

About me

Technical Lead at Pictet Technologies, I am passionate about software development. I have been building IT solutions for more than 20 years for various industries like finance, air cargo, government, banking, insurance and retail. I enjoy learning new things and my goal is to apply my skills and expertise in a team with the aim of delivering the best IT solutions. My super power? I saved the World in 1999 by contributing to fix the Y2K bug… in Cobol.

--

--

Alexandre Jacquot
Pictet Technologies Blog

Technical Lead at Pictet Technologies. Enthusiastic about coding and sharing experiences.