Photo by Vincent Guth on Unsplash

Server-Sent Events (SSE) in Spring 5 with Web MVC and Web Flux

Introduction

There are no simple, general-purpose methods to implement asynchronous server-to-client communication in web applications with acceptable performance.

Overview

There are several technologies that allow a client to receive messages about asynchronous updates from a server. They can be divided into two categories: client pull and server push.

Client pull

In client pull technologies, a client periodically requests a server for updates. The server can respond with updates or with a special response that it has not yet been updated. There are two types of client pull: short polling and long polling.

Short polling

A client periodically sends requests to a server. If the server has updates, it sends a response to the client and closes the connection. If the server has no updates, it sends a special response to the client and also closes the connection.

Long polling

A client sends a request to a server. If the server has updates, it sends a response to the client and closes the connection. If the server has no updates, it holds the connection until updates become available. When updates are available, the server sends a response to the client and closes the connection. If updates are not available for some timeout, the server sends a special response to the client and also closes the connection.

Server push

In server push technologies, a server proactively sends messages to clients immediately after they are available. Among others, there are two types of server push: Server-Sent Events and WebSocket.

Server-Sent Events

Server-Sent Events is a technology to send text messages only from a server to clients in browser-based web applications. Server-Sent Events is based on persistent connections in the HTTP protocol. Server-Sent Events has the network protocol and the EventSource client interface standardized by W3C as part of HTML5 standards suite.

WebSocket

WebSocket is a technology to implement simultaneous, bi-directional, real-time communication in web applications. WebSocket is based on a protocol other than HTTP, so it can require additional setup of network infrastructure (proxy servers, NATs, firewalls, etc). However, WebSocket can provide performance that is difficult to achieve with HTTP-based technologies.

SSE network protocol

To subscribe to server events, a client should make a GET request with the headers:

  • Cache-Control: no-cache disables any events caching
  • Connection: keep-alive indicates that a persistent connection is being used
GET /sse HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
  • Transfer-Encoding: chunked indicates that the server streams dynamically generated content and therefore the content size is not known in advance
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
data: The first event.data: The second event.
data: The third
data: event.
id: 1
data: The first event.
id: 2
data: The second event.
event: type1
data: An event of type1.
event: type2
data: An event of type2.
data: An event without any type.
retry: 1000
: ping

SSE client: EventSource interface

To open a connection, it should be created an EventSource object.

var eventSource = new EventSource('/sse);
var eventSource = new EventSource('/sse?event=type1); 
...
eventSource.close();
eventSource = new EventSource('/sse?event=type1&event=type2);
...
eventSource.close();
  • EventSource.OPEN = 1 - the client has an open connection and is handling events as it receives them
  • EventSource.CLOSED = 2- the connection is not open, and the client is not trying to reconnect either there was a fatal error or the close() method was called
eventSource.onopen = function () {
console.log('connection is established');
};
eventSource.onerror = function (event) {
console.log('connection state: ' + eventSource.readyState + ', error: ' + event);
};
eventSource.onmessage = function (event) {
console.log('id: ' + event.lastEventId + ', data: ' + event.data);
};
eventSource.addEventListener('type1', function (event) {
console.log('id: ' + event.lastEventId + ', data: ' + event.data);
}, false);

SSE Java server: Spring Web MVC

Introduction

Spring Web MVC framework 5.2.0 is based on Servlet 3.1 API and uses thread pools to implement asynchronous Java web applications. Such applications can be run on Servlet 3.1+ containers such as Tomcat 8.5 and Jetty 9.3.

Overview

To implement sending events with Spring Web MVC framework:

  1. to finish sending events exceptionally, call the SseEmitter.completeWithError() method
@RestController
public class SseWebMvcController
private SseEmitter emitter; @GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter createConnection() {
emitter = new SseEmitter();
return emitter;
}
// in another thread
void sendEvents() {
try {
emitter.send("Alpha");
emitter.send("Omega");
emitter.complete();
} catch(Exception e) {
emitter.completeWithError(e);
}
}
}
class SseEmitters {    private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();    SseEmitter add(SseEmitter emitter) {
this.emitters.add(emitter);
emitter.onCompletion(() -> {
this.emitters.remove(emitter);
});
emitter.onTimeout(() -> {
emitter.complete();
this.emitters.remove(emitter);
});
return emitter;
}
void send(Object obj) {
List<SseEmitter> failedEmitters = new ArrayList<>();
this.emitters.forEach(emitter -> {
try {
emitter.send(obj);
} catch (Exception e) {
emitter.completeWithError(e);
failedEmitters.add(emitter);
}
});
this.emitters.removeAll(failedEmitters);
}
}

Handling short-lasting periodic events stream

In this example, a server sends a short-lasting periodic events stream — a finite stream of words (The quick brown fox jumps over the lazy dog pangram) every second, until the words are finished.

@Controller
@RequestMapping("/sse/mvc")
public class WordsController {
private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" "); private final ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); @GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getWords() {
SseEmitter emitter = new SseEmitter();
cachedThreadPool.execute(() -> {
try {
for (int i = 0; i < WORDS.length; i++) {
emitter.send(WORDS[i]);
TimeUnit.SECONDS.sleep(1);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}
curl -v http://localhost:8080/sse/mvc/words
http://localhost:8080/sse/mvc/words
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Server-Sent Events client example with EventSource</title>
</head>
<body>
<script>
if (window.EventSource == null) {
alert('The browser does not support Server-Sent Events');
} else {
var eventSource = new EventSource('/sse/mvc/words');
eventSource.onopen = function () {
console.log('connection is established');
};
eventSource.onerror = function (error) {
console.log('connection state: ' + eventSource.readyState + ', error: ' + event);
};
eventSource.onmessage = function (event) {
console.log('id: ' + event.lastEventId + ', data: ' + event.data);
if (event.data.endsWith('.')) {
eventSource.close();
console.log('connection is closed');
}
};
}
</script>
</body>
</html>

Handling long-lasting periodic events

In this example, a server sends long-lasting periodic events stream — a potentially infinite stream of server performance information every second:

  • total swap space size
  • free swap space size
  • total physical memory size
  • free physical memory size
  • system CPU load
  • process CPU load
@RestController
@RequestMapping("/sse/mvc")
public class PerformanceController {
private final PerformanceService performanceService; PerformanceController(PerformanceService performanceService) {
this.performanceService = performanceService;
}
private final AtomicInteger id = new AtomicInteger(); private final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1); private final SseEmitters emitters = new SseEmitters(); @PostConstruct
void init() {
scheduledThreadPool.scheduleAtFixedRate(() -> {
emitters.send(performanceService.getPerformance());
}, 0, 1, TimeUnit.SECONDS);
}
@GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getPerformance() {
return emitters.add();
}
}

Handling aperiodic events

In this example, a server sends aperiodic events stream about changes of files (create, modify, delete) in a folder being watched. As the folder is used the current user’s home folder available by the System.getProperty("user.home") property.

@RestController
@RequestMapping("/sse/mvc")
public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {
private final FolderWatchService folderWatchService; FolderWatchController(FolderWatchService folderWatchService) {
this.folderWatchService = folderWatchService;
}
private final SseEmitters emitters = new SseEmitters(); @PostConstruct
void init() {
folderWatchService.start(System.getProperty("user.home"));
}
@GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getFolderWatch() {
return emitters.add(new SseEmitter());
}
@Override
public void onApplicationEvent(FolderChangeEvent event) {
emitters.send(event.getEvent());
}
}

SSE Java server: Spring Web Flux

Introduction

Spring Web Flux framework 5.2.0 is based on Reactive Streams API and uses the event-loop computing model to implement asynchronous Java web applications. Such applications can be run on non-blocking web servers such as Netty 4.1 and Undertow 1.4 and on Servlet 3.1+ containers such as Tomcat 8.5 and Jetty 9.3.

Overview

To implement sending events with Spring Web Flux framework:

@RestController
public class ExampleController
@GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> createConnectionAndSendEvents() {
return Flux.just("Alpha", "Omega");
}
}

Handling short-lasting periodic events stream

In this example, a server sends a short-lasting periodic events stream — a finite stream of words (The quick brown fox jumps over the lazy dog pangram) every second, until the words are finished.

  • create a Flux that emits incrementing long values every second Flux.interval(Duration.ofSeconds(1)) of type Flux<Long>
  • combine them together by zip method to type Flux<Tuple2<String,Long>>
  • extract the first element of the tuple by map(Tuple2::getT1) of type Flux<String>
@RestController
@RequestMapping("/sse/flux")
public class WordsController {
private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" "); @GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<String> getWords() {
return Flux
.zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1);
}
}

Handling long-lasting periodic events

In this example, a server sends long-lasting periodic events stream — a potentially infinite stream of server performance information every second.

  • convert it by map(sequence -> performanceService.getPerformance()) method to type Flux<Performance>
@RestController
@RequestMapping("/sse/flux")
public class PerformanceController {
private final PerformanceService performanceService; PerformanceController(PerformanceService performanceService) {
this.performanceService = performanceService;
}
@GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Performance> getPerformance() {
return Flux
.interval(Duration.ofSeconds(1))
.map(sequence -> performanceService.getPerformance());
}
}

Handling aperiodic events

In this example, a server sends aperiodic events stream about changes of files (create, modify, delete) in a folder being watched. As the folder is used the current user’s home folder available by the System.getProperty("user.home") property.

@RestController
@RequestMapping("/sse/flux")
public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {
private final FolderWatchService folderWatchService; FolderWatchController(FolderWatchService folderWatchService) {
this.folderWatchService = folderWatchService;
}
private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get(); @PostConstruct
void init() {
folderWatchService.start(System.getProperty("user.home"));
}
@GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<FolderChangeEvent.Event> getFolderWatch() {
return Flux.create(sink -> {
MessageHandler handler = message -> sink.next(FolderChangeEvent.class.cast(message.getPayload()).getEvent());
sink.onCancel(() -> subscribableChannel.unsubscribe(handler));
subscribableChannel.subscribe(handler);
}, FluxSink.OverflowStrategy.LATEST);
}
@Override
public void onApplicationEvent(FolderChangeEvent event) {
subscribableChannel.send(new GenericMessage<>(event));
}
}

SSE limitations

There are limitations of SSE by design:

  • it’s possible to send only text messages; despite it’s possible to use Base64 encoding and gzip compression to send binary messages, it can be inefficient.
  • many browsers allow opening a very limited number of SSE connections (up to 6 connections per browser for Chrome, Firefox)

Conclusion

Complete code examples are available in the GitHub repository.

Senior Software Engineer

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store