SSE (Server-sent events) in Spring Boot

Sergey Savrukhin
5 min readDec 25, 2022

Prerequisites

Java 8+

Spring Boot 2

Spring Web MVC

Overview

Hello everyone. Today I want to tell you how to implement SSE (Server Sent Events) in Java.

What is Server Sent Events?

SSE — is a technology that allows you to transfer data from the server to the client within a single HTTP connection in one direction. Let’s create a Spring Boot application using the Spring initializer.

Go to https://start.spring.io/

Select the project parameters:

Project Type — Maven

Language — Java

Spring Boot version — the latest stable version (At the time of writing Spring Boot 3.0.1)

Select the Spring Web dependency.

Click Generate. Save the generated project template and open it in your IDE. In my case, it is IntelliJ IDEA.

Let’s imagine that we have a system that displays stock quotes. Accordingly, when the quotes change, we should send updated information to clients. Let’s create a simple class reflecting the information about the share. This class will contain only one field — the stock price.

import java.math.BigDecimal;

public class Stock {
private final BigDecimal price;

Stock(BigDecimal price) {
this.price = price;
}

public BigDecimal getPrice() {
return price;
}
}

Now let’s create a class that will simulate a change in the price of a stock.

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

@Component
public class StockPriceChanger {
private final ApplicationEventPublisher publisher; //1.1

private final Random random = new Random(); //1.2

private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); //1.3

public StockPriceChanger(ApplicationEventPublisher publisher) {
this.publisher = publisher;
this.executor.schedule(this::changePrice, 1, SECONDS); //1.4
}

private void changePrice() {
BigDecimal price = BigDecimal.valueOf(random.nextGaussian()); //1.5
publisher.publishEvent(new Stock(price)); //1.6
executor.schedule(this::changePrice, random.nextInt(10000), MILLISECONDS); //1.7
}
}

We have a dependency on ApplicationEventPublisher, injected through the constructor (1.1). Create an instance of the Random class to generate a random share price (1.2).

We will generate the price change in a separate thread at random intervals. To do this, create a single-threaded executor that can schedule commands to run after a given delay, or to execute periodically (1.3).

In the constructor of our class, let’s start the thread of generating a random price of the share (1.4). In the changePrice method, we get a random price value (1.5), publish a Stock event for all subscribers (1.6), and schedule the creation of the next value with a random delay (1.7).

As you have noticed we have a class dependency on the ApplicationEventPublisher. Spring “out of the box” provides a simple mechanism for handling events that reduce the connectivity of system components. An event that occurs at one point in the application can be captured and handled at any other part of the application thanks to entities like publisher and eventListener.

Next, create a controller to handle requests from clients.

import org.springframework.context.event.EventListener;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

@RestController
public class StockController {
private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>(); //2.1

@GetMapping("/stocks-stream") //2.2
public SseEmitter stocksStream() {
SseEmitter sseEmitter = new SseEmitter();
clients.add(sseEmitter);

sseEmitter.onTimeout(() -> clients.remove(sseEmitter));
sseEmitter.onError(throwable -> clients.remove(sseEmitter));

return sseEmitter;
}

@Async
@EventListener
public void stockMessageHandler(Stock stock) { //2.3
List<SseEmitter> errorEmitters = new ArrayList<>();

clients.forEach(emitter -> {
try {
emitter.send(stock, MediaType.APPLICATION_JSON); //2.4
} catch (Exception e) {
errorEmitters.add(emitter);
}
});

errorEmitters.forEach(clients::remove); //2.5
}
}

Spring 4.2 introduced a new class, ResponseBodyEmitter, which is used as a return type in Spring Web MVC controllers for asynchronous requests. ResponseBodyEmitter can be used to send multiple objects where each object is written with a compatible HttpMessageConverter.
SseEmitter extends ResponseBodyEmitter and allows you to send many messages in response to a single request, as required by the SSE protocol.

Let’s take a closer look at this class:

Create a set in which we will store the connected clients (1). Endpoint, which registers the client to receive notifications of changes in prices (2). In this method, we create a new SseEmitter, save it so we can send notifications to it in the future. And we register two handlers, onTimeout and onError. When an error or timeout occurs, we will remove the client from the list of registered clients.

Message Handler (2.3). This method has two annotations @Async and @EventListener. The @EventListener annotation is used to indicate that the method is an event handler. The type of events it handles is determined by the type of the argument, in this case Stock. By default, the listener is invoked synchronously. However, we can easily make it asynchronous by adding an @Async annotation. Annotating a method of a bean with @Async will make it execute in a separate thread. In other words, the caller will not wait for the completion of the called method. Our method takes a new event with share price value and asynchronously sends it to all clients in JSON format (2.4). If an error occurs during sending, we save this unsuccessful emitter and then remove such emitter from the list of active clients (2.5).

In order for Spring to recognize methods marked with @Async annotation and run these methods in the background thread pool, the @EnableAsync annotation must be added above the configuration class (3.1)

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@EnableAsync //3.1
@SpringBootApplication
public class SsewebmvcApplication {

public static void main(String[] args) {
SpringApplication.run(SsewebmvcApplication.class, args);
}
}

Now let’s create a simple user interface to see what we have.
Let’s add an index.html file to our project at the path src/main/resources/static

And let’s write a simple code to display changes in stock quotes:

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE example</title>
</head>
<body>
<ul id="stockChanges"></ul>
<script type="application/javascript">
function add(message) {
const li = document.createElement("li");
li.innerHTML = message;
document.getElementById("stockChanges").appendChild(li);
}

const eventSource = new EventSource("/stocks-stream"); //4.1

eventSource.onmessage = e => {
const response = JSON.parse(e.data);

add('Stock price was changed, new price: ' + response.price + ' $'); //4.2
}
eventSource.onopen = e => add('Connection opened');
eventSource.onerror = e => add('Connection closed');
</script>
</body>
</html>

To start receiving data, we create a new EventSource(“/stocks-stream”) (4.1). The browser will make a connection to /stocks-stream and keep it open while waiting for an event.

By default the EventSource object generates 3 events:

  • message — message received, available as e.data.
  • open — connection is opened.
  • error — connection failed, e.g. server returned status 500.

In the message event handler we read the received stock quote and create a new list item (4.2)

Finally, let’s run our application and see what happens. After launching the application, we need to go to http://localhost:8080 and we will see roughly the following picture.

Thank you for your attention.
You can get the full example code on GitHub.

--

--