Image for post
Image for post
Photo by Vincent Guth on Unsplash

Server-Sent Events (SSE) in Spring 5 with Web MVC and Web Flux

Introduction

Overview

Client pull

Short polling

Long polling

Server push

Server-Sent Events

WebSocket

SSE network protocol

GET /sse HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
data: The first event.data: The second event.
data: The third
data: event.
id: 1
data: The first event.
id: 2
data: The second event.
event: type1
data: An event of type1.
event: type2
data: An event of type2.
data: An event without any type.
retry: 1000
: ping

SSE client: EventSource interface

var eventSource = new EventSource('/sse);
var eventSource = new EventSource('/sse?event=type1); 
...
eventSource.close();
eventSource = new EventSource('/sse?event=type1&event=type2);
...
eventSource.close();
eventSource.onopen = function () {
console.log('connection is established');
};
eventSource.onerror = function (event) {
console.log('connection state: ' + eventSource.readyState + ', error: ' + event);
};
eventSource.onmessage = function (event) {
console.log('id: ' + event.lastEventId + ', data: ' + event.data);
};
eventSource.addEventListener('type1', function (event) {
console.log('id: ' + event.lastEventId + ', data: ' + event.data);
}, false);
Image for post
Image for post

SSE Java server: Spring Web MVC

Introduction

Overview

@RestController
public class SseWebMvcController
private SseEmitter emitter; @GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter createConnection() {
emitter = new SseEmitter();
return emitter;
}
// in another thread
void sendEvents() {
try {
emitter.send("Alpha");
emitter.send("Omega");
emitter.complete();
} catch(Exception e) {
emitter.completeWithError(e);
}
}
}
class SseEmitters {    private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();    SseEmitter add(SseEmitter emitter) {
this.emitters.add(emitter);
emitter.onCompletion(() -> {
this.emitters.remove(emitter);
});
emitter.onTimeout(() -> {
emitter.complete();
this.emitters.remove(emitter);
});
return emitter;
}
void send(Object obj) {
List<SseEmitter> failedEmitters = new ArrayList<>();
this.emitters.forEach(emitter -> {
try {
emitter.send(obj);
} catch (Exception e) {
emitter.completeWithError(e);
failedEmitters.add(emitter);
}
});
this.emitters.removeAll(failedEmitters);
}
}

Handling short-lasting periodic events stream

@Controller
@RequestMapping("/sse/mvc")
public class WordsController {
private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" "); private final ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); @GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getWords() {
SseEmitter emitter = new SseEmitter();
cachedThreadPool.execute(() -> {
try {
for (int i = 0; i < WORDS.length; i++) {
emitter.send(WORDS[i]);
TimeUnit.SECONDS.sleep(1);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}
curl -v http://localhost:8080/sse/mvc/words
Image for post
Image for post
http://localhost:8080/sse/mvc/words
Image for post
Image for post
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Server-Sent Events client example with EventSource</title>
</head>
<body>
<script>
if (window.EventSource == null) {
alert('The browser does not support Server-Sent Events');
} else {
var eventSource = new EventSource('/sse/mvc/words');
eventSource.onopen = function () {
console.log('connection is established');
};
eventSource.onerror = function (error) {
console.log('connection state: ' + eventSource.readyState + ', error: ' + event);
};
eventSource.onmessage = function (event) {
console.log('id: ' + event.lastEventId + ', data: ' + event.data);
if (event.data.endsWith('.')) {
eventSource.close();
console.log('connection is closed');
}
};
}
</script>
</body>
</html>
Image for post
Image for post

Handling long-lasting periodic events

@RestController
@RequestMapping("/sse/mvc")
public class PerformanceController {
private final PerformanceService performanceService; PerformanceController(PerformanceService performanceService) {
this.performanceService = performanceService;
}
private final AtomicInteger id = new AtomicInteger(); private final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1); private final SseEmitters emitters = new SseEmitters(); @PostConstruct
void init() {
scheduledThreadPool.scheduleAtFixedRate(() -> {
emitters.send(performanceService.getPerformance());
}, 0, 1, TimeUnit.SECONDS);
}
@GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getPerformance() {
return emitters.add();
}
}
Image for post
Image for post

Handling aperiodic events

@RestController
@RequestMapping("/sse/mvc")
public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {
private final FolderWatchService folderWatchService; FolderWatchController(FolderWatchService folderWatchService) {
this.folderWatchService = folderWatchService;
}
private final SseEmitters emitters = new SseEmitters(); @PostConstruct
void init() {
folderWatchService.start(System.getProperty("user.home"));
}
@GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getFolderWatch() {
return emitters.add(new SseEmitter());
}
@Override
public void onApplicationEvent(FolderChangeEvent event) {
emitters.send(event.getEvent());
}
}
Image for post
Image for post

SSE Java server: Spring Web Flux

Introduction

Overview

@RestController
public class ExampleController
@GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> createConnectionAndSendEvents() {
return Flux.just("Alpha", "Omega");
}
}

Handling short-lasting periodic events stream

@RestController
@RequestMapping("/sse/flux")
public class WordsController {
private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" "); @GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<String> getWords() {
return Flux
.zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1);
}
}

Handling long-lasting periodic events

@RestController
@RequestMapping("/sse/flux")
public class PerformanceController {
private final PerformanceService performanceService; PerformanceController(PerformanceService performanceService) {
this.performanceService = performanceService;
}
@GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Performance> getPerformance() {
return Flux
.interval(Duration.ofSeconds(1))
.map(sequence -> performanceService.getPerformance());
}
}

Handling aperiodic events

@RestController
@RequestMapping("/sse/flux")
public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {
private final FolderWatchService folderWatchService; FolderWatchController(FolderWatchService folderWatchService) {
this.folderWatchService = folderWatchService;
}
private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get(); @PostConstruct
void init() {
folderWatchService.start(System.getProperty("user.home"));
}
@GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<FolderChangeEvent.Event> getFolderWatch() {
return Flux.create(sink -> {
MessageHandler handler = message -> sink.next(FolderChangeEvent.class.cast(message.getPayload()).getEvent());
sink.onCancel(() -> subscribableChannel.unsubscribe(handler));
subscribableChannel.subscribe(handler);
}, FluxSink.OverflowStrategy.LATEST);
}
@Override
public void onApplicationEvent(FolderChangeEvent event) {
subscribableChannel.send(new GenericMessage<>(event));
}
}

SSE limitations

Conclusion

Written by

Senior Software Engineer

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store