Skip to content

Commit da98ff7

Browse files
committed
Use StringDecoder to split SSE stream
ServerSentEventHttpMessageReader had logic to split on new lines and buffer until an empty new line (start of a new event). To account for random data chunking, it later re-assembled the lines for each event and split again on new lines. However bufferUntil was still unreliable a chunk may contain nothing but a newline, which doesn't necessarily mean an empty newline in the overall SSE stream. This commit simplifies the above by delegating the splitting of the stream along newlines to StringDecoder. Issue: SPR-16744
1 parent 30c98c8 commit da98ff7

File tree

2 files changed

+62
-95
lines changed

2 files changed

+62
-95
lines changed

spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java

Lines changed: 32 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,11 @@
1616

1717
package org.springframework.http.codec;
1818

19-
import java.nio.CharBuffer;
2019
import java.nio.charset.StandardCharsets;
2120
import java.time.Duration;
22-
import java.util.ArrayList;
2321
import java.util.Collections;
2422
import java.util.List;
2523
import java.util.Map;
26-
import java.util.function.IntPredicate;
27-
import java.util.stream.Collectors;
2824

2925
import reactor.core.publisher.Flux;
3026
import reactor.core.publisher.Mono;
@@ -35,7 +31,6 @@
3531
import org.springframework.core.codec.StringDecoder;
3632
import org.springframework.core.io.buffer.DataBuffer;
3733
import org.springframework.core.io.buffer.DataBufferFactory;
38-
import org.springframework.core.io.buffer.DataBufferUtils;
3934
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
4035
import org.springframework.http.MediaType;
4136
import org.springframework.http.ReactiveHttpInputMessage;
@@ -51,12 +46,12 @@
5146
*/
5247
public class ServerSentEventHttpMessageReader implements HttpMessageReader<Object> {
5348

54-
private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';
55-
5649
private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
5750

5851
private static final StringDecoder stringDecoder = StringDecoder.textPlainOnly();
5952

53+
private static final ResolvableType STRING_TYPE = ResolvableType.forClass(String.class);
54+
6055

6156
@Nullable
6257
private final Decoder<?> decoder;
@@ -110,77 +105,53 @@ public Flux<Object> read(ResolvableType elementType, ReactiveHttpInputMessage me
110105
boolean shouldWrap = isServerSentEvent(elementType);
111106
ResolvableType valueType = (shouldWrap ? elementType.getGeneric(0) : elementType);
112107

113-
return Flux.from(message.getBody())
114-
.concatMap(ServerSentEventHttpMessageReader::splitOnNewline)
115-
.map(buffer -> {
116-
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
117-
DataBufferUtils.release(buffer);
118-
return charBuffer.toString();
119-
})
120-
.bufferUntil(line -> line.equals("\n"))
121-
.concatMap(rawLines -> {
122-
String[] lines = rawLines.stream().collect(Collectors.joining()).split("\\r?\\n");
123-
return buildEvent(lines, valueType, hints)
124-
.filter(event -> shouldWrap || event.data() != null)
125-
.map(event -> shouldWrap ? event : event.data());
126-
})
127-
.cast(Object.class);
128-
}
129-
130-
private static Flux<DataBuffer> splitOnNewline(DataBuffer dataBuffer) {
131-
List<DataBuffer> results = new ArrayList<>();
132-
int startIdx = 0;
133-
int endIdx;
134-
final int limit = dataBuffer.readableByteCount();
135-
do {
136-
endIdx = dataBuffer.indexOf(NEWLINE_DELIMITER, startIdx);
137-
int length = endIdx != -1 ? endIdx - startIdx + 1 : limit - startIdx;
138-
DataBuffer token = dataBuffer.slice(startIdx, length);
139-
results.add(DataBufferUtils.retain(token));
140-
startIdx = endIdx + 1;
141-
}
142-
while (startIdx < limit && endIdx != -1);
143-
DataBufferUtils.release(dataBuffer);
144-
return Flux.fromIterable(results);
108+
return stringDecoder.decode(message.getBody(), STRING_TYPE, null, Collections.emptyMap())
109+
.bufferUntil(line -> line.equals(""))
110+
.concatMap(lines -> buildEvent(lines, valueType, shouldWrap, hints));
145111
}
146112

147-
private Mono<ServerSentEvent<Object>> buildEvent(String[] lines, ResolvableType valueType,
113+
private Mono<?> buildEvent(List<String> lines, ResolvableType valueType, boolean shouldWrap,
148114
Map<String, Object> hints) {
149115

150-
ServerSentEvent.Builder<Object> sseBuilder = ServerSentEvent.builder();
116+
ServerSentEvent.Builder<Object> sseBuilder = shouldWrap ? ServerSentEvent.builder() : null;
151117
StringBuilder data = null;
152118
StringBuilder comment = null;
153119

154120
for (String line : lines) {
155-
if (line.startsWith("id:")) {
156-
sseBuilder.id(line.substring(3));
157-
}
158-
else if (line.startsWith("event:")) {
159-
sseBuilder.event(line.substring(6));
160-
}
161-
else if (line.startsWith("data:")) {
121+
if (line.startsWith("data:")) {
162122
data = (data != null ? data : new StringBuilder());
163123
data.append(line.substring(5)).append("\n");
164124
}
165-
else if (line.startsWith("retry:")) {
166-
sseBuilder.retry(Duration.ofMillis(Long.valueOf(line.substring(6))));
167-
}
168-
else if (line.startsWith(":")) {
169-
comment = (comment != null ? comment : new StringBuilder());
170-
comment.append(line.substring(1)).append("\n");
125+
if (shouldWrap) {
126+
if (line.startsWith("id:")) {
127+
sseBuilder.id(line.substring(3));
128+
}
129+
else if (line.startsWith("event:")) {
130+
sseBuilder.event(line.substring(6));
131+
}
132+
else if (line.startsWith("retry:")) {
133+
sseBuilder.retry(Duration.ofMillis(Long.valueOf(line.substring(6))));
134+
}
135+
else if (line.startsWith(":")) {
136+
comment = (comment != null ? comment : new StringBuilder());
137+
comment.append(line.substring(1)).append("\n");
138+
}
171139
}
172140
}
173-
if (comment != null) {
174-
sseBuilder.comment(comment.toString().substring(0, comment.length() - 1));
175-
}
176-
if (data != null) {
177-
return decodeData(data.toString(), valueType, hints).map(decodedData -> {
178-
sseBuilder.data(decodedData);
141+
142+
Mono<?> decodedData = (data != null ? decodeData(data.toString(), valueType, hints) : Mono.empty());
143+
144+
if (shouldWrap) {
145+
if (comment != null) {
146+
sseBuilder.comment(comment.toString().substring(0, comment.length() - 1));
147+
}
148+
return decodedData.map(o -> {
149+
sseBuilder.data(o);
179150
return sseBuilder.build();
180151
});
181152
}
182153
else {
183-
return Mono.just(sseBuilder.build());
154+
return decodedData;
184155
}
185156
}
186157

spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.time.Duration;
2020

21+
import org.junit.Assume;
2122
import org.junit.Before;
2223
import org.junit.Ignore;
2324
import org.junit.Test;
@@ -32,8 +33,10 @@
3233
import org.springframework.http.codec.ServerSentEvent;
3334
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
3435
import org.springframework.http.server.reactive.HttpHandler;
36+
import org.springframework.http.server.reactive.bootstrap.JettyHttpServer;
3537
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer;
3638
import org.springframework.web.bind.annotation.GetMapping;
39+
import org.springframework.web.bind.annotation.RequestMapping;
3740
import org.springframework.web.bind.annotation.RestController;
3841
import org.springframework.web.reactive.DispatcherHandler;
3942
import org.springframework.web.reactive.config.EnableWebFlux;
@@ -103,51 +106,42 @@ public void sseAsPerson() {
103106

104107
@Test
105108
public void sseAsEvent() {
106-
Flux<ServerSentEvent<String>> result = this.webClient.get()
109+
110+
Assume.assumeTrue(server instanceof JettyHttpServer);
111+
112+
Flux<ServerSentEvent<Person>> result = this.webClient.get()
107113
.uri("/event")
108114
.accept(TEXT_EVENT_STREAM)
109115
.retrieve()
110-
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {});
116+
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});
111117

112-
StepVerifier.create(result)
113-
.consumeNextWith( event -> {
114-
assertEquals("0", event.id());
115-
assertEquals("foo", event.data());
116-
assertEquals("bar", event.comment());
117-
assertNull(event.event());
118-
assertNull(event.retry());
119-
})
120-
.consumeNextWith( event -> {
121-
assertEquals("1", event.id());
122-
assertEquals("foo", event.data());
123-
assertEquals("bar", event.comment());
124-
assertNull(event.event());
125-
assertNull(event.retry());
126-
})
127-
.thenCancel()
128-
.verify(Duration.ofSeconds(5L));
118+
verifyPersonEvents(result);
129119
}
130120

131121
@Test
132122
public void sseAsEventWithoutAcceptHeader() {
133-
Flux<ServerSentEvent<String>> result = this.webClient.get()
123+
Flux<ServerSentEvent<Person>> result = this.webClient.get()
134124
.uri("/event")
135125
.accept(TEXT_EVENT_STREAM)
136126
.retrieve()
137-
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {});
127+
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});
128+
129+
verifyPersonEvents(result);
130+
}
138131

132+
private void verifyPersonEvents(Flux<ServerSentEvent<Person>> result) {
139133
StepVerifier.create(result)
140134
.consumeNextWith( event -> {
141135
assertEquals("0", event.id());
142-
assertEquals("foo", event.data());
143-
assertEquals("bar", event.comment());
136+
assertEquals(new Person("foo 0"), event.data());
137+
assertEquals("bar 0", event.comment());
144138
assertNull(event.event());
145139
assertNull(event.retry());
146140
})
147141
.consumeNextWith( event -> {
148142
assertEquals("1", event.id());
149-
assertEquals("foo", event.data());
150-
assertEquals("bar", event.comment());
143+
assertEquals(new Person("foo 1"), event.data());
144+
assertEquals("bar 1", event.comment());
151145
assertNull(event.event());
152146
assertNull(event.retry());
153147
})
@@ -180,32 +174,34 @@ public void serverDetectsClientDisconnect() {
180174

181175
@RestController
182176
@SuppressWarnings("unused")
177+
@RequestMapping("/sse")
183178
static class SseController {
184179

185180
private static final Flux<Long> INTERVAL = interval(Duration.ofMillis(100), 50);
186181

187182
private MonoProcessor<Void> cancellation = MonoProcessor.create();
188183

189184

190-
@GetMapping("/sse/string")
185+
@GetMapping("/string")
191186
Flux<String> string() {
192187
return INTERVAL.map(l -> "foo " + l);
193188
}
194189

195-
@GetMapping("/sse/person")
190+
@GetMapping("/person")
196191
Flux<Person> person() {
197192
return INTERVAL.map(l -> new Person("foo " + l));
198193
}
199194

200-
@GetMapping("/sse/event")
201-
Flux<ServerSentEvent<String>> sse() {
202-
return INTERVAL.map(l -> ServerSentEvent.builder("foo")
203-
.id(Long.toString(l))
204-
.comment("bar")
205-
.build());
195+
@GetMapping("/event")
196+
Flux<ServerSentEvent<Person>> sse() {
197+
return INTERVAL.take(2).map(l ->
198+
ServerSentEvent.builder(new Person("foo " + l))
199+
.id(Long.toString(l))
200+
.comment("bar " + l)
201+
.build());
206202
}
207203

208-
@GetMapping("/sse/infinite")
204+
@GetMapping("/infinite")
209205
Flux<String> infinite() {
210206
return Flux.just(0, 1).map(l -> "foo " + l)
211207
.mergeWith(Flux.never())

0 commit comments

Comments
 (0)