Take Reactive programming with Spring to the infinity and beyond

Fede Lopez
5 min readSep 20, 2017

--

Spring 5 has a new functional web framework that changes the way we develop applications. With this new release you will be able to develop non-blocking, event-driven services leveraging on the open source reactive framework Project Reactor.

In this blog post I am going to show you how to develop a Spring Boot app that serves infinite streams of Bitcoin prices and how to display them on a web browser as they get updated every second.

Building the backend

Go to https://start.spring.io and generate a Gradle project for Kotlin and Spring Boot version 2.0.0 M4. Add the dependency Reactive Web. Name the application web-flux-reactive.

Open the project in your IDE and navigate to the main application class generated by the initializr: WebFluxReactiveApplication.

Enable the application to be a Restful service using the RestController annotation:

@SpringBootApplication
@RestController
class WebFluxReactiveApplication
...

Create a data class representing a Bitcoin price. This class will hold 2 properties, the update time and the rate, it does not matter in which currency. Instances of this class will be sent to the web client:

data class BitCoinPrice(val updated: Date, val rate: BigDecimal)

Create a GetMapping function to retrieve a Flux stream of Bitcoin prices, leave the body of the method blank for the moment:

@CrossOrigin
@GetMapping(value = "/bitcoinprices", produces = arrayOf(MediaType.TEXT_EVENT_STREAM_VALUE))
fun getBitcoinPrices(): Flux<BitCoinPrice> {

}

Notice the following:

  • The get endpoint produces a response of MIME type text/event-stream. This is needed when you want to return to the client server-side events.
  • The body of the response is a Flux<BitCoinPrice>. This type is a reactive stream publisher that emits 0 to N elements, and then completes. In our case we will continuously emit events forever.
  • We have enabled the endpoint to allow cross-origin requests so that our web app can call directly the service from a different origin.

Now let’s implement the body of the function getBitcoinPrices. What we are going to do is to emit a Flux of Bitcoin prices every second. For that we will create two flux objects and combine them.

The first one is an interval that emits an event every second:

val interval = Flux.interval(Duration.ofSeconds(1))

The second one creates a stream of random Bitcoin prices:

val bitCoinPrice = Flux.fromStream<BitCoinPrice>(Stream.generate({
BitCoinPrice(Date(), BigDecimal.valueOf(Math.abs(Random().nextLong())))
}))

Now we zip them together so that we can generate a random Bitcoin price every second:

val bitCoinEvents = Flux.zip(interval, bitCoinPrice).map { it.t2 }

the result Rest endpoint looks like this:

@CrossOrigin
@GetMapping(value = "/bitcoinprices", produces = arrayOf(MediaType.TEXT_EVENT_STREAM_VALUE))
fun getBitcoinPrices(): Flux<BitCoinPrice> {
val interval = Flux.interval(Duration.ofSeconds(1))
val bitCoinPrice = Flux.fromStream<BitCoinPrice>(Stream.generate({
BitCoinPrice(Date(), BigDecimal.valueOf(Math.abs(Random().nextLong())))
}))
val bitCoinEvents = Flux.zip(interval, bitCoinPrice).map { it.t2 }
return bitCoinEvents
}

To test that this works, start the app:

./gradlew bootRun

Now call the bitcoinprices GET endpoint from the command line using curl:

curl http://localhost:8080/bitcoinprices -H "Accept:text/event-stream"

You should see a response that produces random Bitcoin prices every second:

data:{"updated":1505872373787,"rate":4031582811327568277}data:{"updated":1505872374793,"rate":6041308289908491586}data:{"updated":1505872375791,"rate":3622194130843937061}data:{"updated":1505872376789,"rate":8362185520184618113}

Building the frontend

Consuming server-sent events from a web app is fairly straightforward. For this example we are going to build a very simple web page that uses an EventSource object to receive the Bitcoin prices. This class allows us to connect to the server over HTTP and receive events in text/event-stream format, which is the MIME type that our endpoint uses.

One interesting feature that EventSource provides is that it does not close the connection and it will re-connect automatically if for some reason the communication with the server drops.

Now create anywhere on your file system a blank index.html page that pulls the latest JQuery dependency:

<!DOCTYPE html>
<html>
<head>
<title>Event Source Client</title>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.10.2/jquery.min.js"></script>
<script src="app.js"></script>
</head>
<body>
</body>
</html>

Add the following div to the body tag:

...
<body>
<div>
<h1>Real time Bitcoin prices</h1>
<p id="updated">Updated: </p>
<p id="rate">Rate: </p>
<button id="start">Start</button>
<button id="cancel">Cancel</button>
</div>
</body>
...

This div has four elements that are worth mentioning:

  • a paragraph that will show the update time of the Bitcoin price
  • another paragraph that will show the Bitcoin rate
  • a start button to instruct when to start receiving Bitcoin prices
  • a cancel button to stop receiving Bitcoin prices

Now create a file named app.js in the same folder as the index.html. This file will be used to add the logic needed to update the web page index.html.

Add a ready callback so that we can manipulated safely the page using JavaScript:

(document).ready(function () {

});

Add a function inside the callback named createEventSource. This function returns an EventSource object that establishes a connection with our restful endpoint:

function createEventSource() {
var eventSource = new EventSource("http://localhost:8080/bitcoinprices");

return eventSource;
}

Now attach a callback to the EventSource object before returning it so that whenever a message is sent by the server we update the HTML components with the latest information:

eventSource.addEventListener('message', function (e) {
const body = JSON.parse(e.data);
$("#updated").text("Updated: " + body.updated);
$("#rate").text("Rate: " + body.rate);
}, false);

Create an EventSource variable inside the ready callback:

var eventSource;

Now add a JQuery callback to the start button. This will cancel the previous connection (if any) and we will establish a new one to receive Bitcoin prices:

$("#start").click(function() {
if (eventSource) {
eventSource.close();
}
eventSource = createEventSource();
});

Similarly, we want to stop listening for server-sent Bitcoin prices whenver the user clicks the cancel button:

$("#cancel").click(function() {
eventSource.close();
});

The final app.js should look like this:

$(document).ready(function () {

function createEventSource() {
var eventSource = new EventSource("http://localhost:8080/events");
eventSource.addEventListener('message', function (e) {
const body = JSON.parse(e.data);
$("#updated").text("Updated: " + body.updated);
$("#rate").text("Rate: " + body.rate);
}, false);
return eventSource;
}

var eventSource;

$("#start").click(function() {
if (eventSource) {
eventSource.close();
}
eventSource = createEventSource();
});

$("#cancel").click(function() {
eventSource.close();
});
});

Testing the client

Now that we have an index.html and an app.js ready to be served, let’s spin up a local server so that the web app can be accessed from your browser.

Install local-web-server using npm:

npm install -g local-web-server

Go to the folder where reside the index.html and the app.js files.

Run the following command:

ws --spa index.html

Open your browser and navigate to http://127.0.0.1:8000.

You should see the Bitcoin prices updated every second:

Take aways

Today we have learned that:

  • Spring 5 gives you the tools to create async, non-blocking Restful endpoints that can serve streams of data over HTTP connections.
  • Your web client can easily consume these streams of data and be notified whenever there are new updates.
  • All this can be achieved with just a few lines of code thanks to the sleek functions provided by the Project Reactor and available in Spring 5.

Hopefully this will help you to embark on the reactive journey and bring your apps to the infinity and beyond!

--

--