Skip to content

Commit a999f40

Browse files
committed
Polish + minor refactoring of SSE reader and writer
Instead of accepting List<Encoder|Decoder> and then look for the first to support JSON, always expect a single JSON [Encoder|Decoder] and use that unconditionally. When writing use the nested ResolvableType instead of the Class of the actual value which should better support generics. Remove the SSE hint and pass "text/event-stream" as the media type instead to serve as a hint. We are expecting a JSON encoder and using it unconditionally in any case so this should be good enough.
1 parent d0e0b6c commit a999f40

File tree

9 files changed

+153
-196
lines changed

9 files changed

+153
-196
lines changed

spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import reactor.core.publisher.Mono;
3030

3131
import org.springframework.core.ResolvableType;
32-
import org.springframework.core.codec.CodecException;
3332
import org.springframework.core.codec.Decoder;
3433
import org.springframework.core.codec.StringDecoder;
3534
import org.springframework.core.io.buffer.DataBuffer;
@@ -39,7 +38,6 @@
3938
import org.springframework.http.MediaType;
4039
import org.springframework.http.ReactiveHttpInputMessage;
4140
import org.springframework.util.Assert;
42-
import org.springframework.util.MimeTypeUtils;
4341

4442
import static java.util.stream.Collectors.joining;
4543

@@ -61,37 +59,36 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
6159
private static final StringDecoder stringDecoder = new StringDecoder(false);
6260

6361

64-
private final List<Decoder<?>> dataDecoders;
62+
private final Decoder<?> decoder;
6563

6664

67-
public ServerSentEventHttpMessageReader() {
68-
this.dataDecoders = Collections.emptyList();
65+
/**
66+
* Constructor with JSON {@code Encoder} for encoding objects.
67+
*/
68+
public ServerSentEventHttpMessageReader(Decoder<?> decoder) {
69+
Assert.notNull(decoder, "Decoder must not be null");
70+
this.decoder = decoder;
6971
}
7072

71-
public ServerSentEventHttpMessageReader(List<Decoder<?>> dataDecoders) {
72-
Assert.notNull(dataDecoders, "'dataDecoders' must not be null");
73-
this.dataDecoders = new ArrayList<>(dataDecoders);
74-
}
7573

74+
@Override
75+
public List<MediaType> getReadableMediaTypes() {
76+
return Collections.singletonList(MediaType.TEXT_EVENT_STREAM);
77+
}
7678

7779
@Override
7880
public boolean canRead(ResolvableType elementType, MediaType mediaType) {
7981
return MediaType.TEXT_EVENT_STREAM.isCompatibleWith(mediaType) ||
8082
ServerSentEvent.class.isAssignableFrom(elementType.getRawClass());
8183
}
8284

83-
@Override
84-
public List<MediaType> getReadableMediaTypes() {
85-
return Collections.singletonList(MediaType.TEXT_EVENT_STREAM);
86-
}
87-
8885

8986
@Override
9087
public Flux<Object> read(ResolvableType elementType, ReactiveHttpInputMessage message,
9188
Map<String, Object> hints) {
9289

93-
boolean hasSseWrapper = ServerSentEvent.class.isAssignableFrom(elementType.getRawClass());
94-
ResolvableType dataType = (hasSseWrapper ? elementType.getGeneric(0) : elementType);
90+
boolean shouldWrap = ServerSentEvent.class.isAssignableFrom(elementType.getRawClass());
91+
ResolvableType valueType = shouldWrap ? elementType.getGeneric(0) : elementType;
9592

9693
return Flux.from(message.getBody())
9794
.concatMap(ServerSentEventHttpMessageReader::splitOnNewline)
@@ -103,8 +100,8 @@ public Flux<Object> read(ResolvableType elementType, ReactiveHttpInputMessage me
103100
.bufferUntil(line -> line.equals("\n"))
104101
.concatMap(rawLines -> {
105102
String[] lines = rawLines.stream().collect(joining()).split("\\r?\\n");
106-
ServerSentEvent<Object> event = buildEvent(lines, dataType, hints);
107-
return (hasSseWrapper ? Mono.just(event) : Mono.justOrEmpty(event.data()));
103+
ServerSentEvent<Object> event = buildEvent(lines, valueType, hints);
104+
return (shouldWrap ? Mono.just(event) : Mono.justOrEmpty(event.data()));
108105
})
109106
.cast(Object.class);
110107
}
@@ -126,7 +123,8 @@ private static Flux<DataBuffer> splitOnNewline(DataBuffer dataBuffer) {
126123
return Flux.fromIterable(results);
127124
}
128125

129-
private ServerSentEvent<Object> buildEvent(String[] lines, ResolvableType dataType, Map<String, Object> hints) {
126+
private ServerSentEvent<Object> buildEvent(String[] lines, ResolvableType valueType,
127+
Map<String, Object> hints) {
130128

131129
ServerSentEvent.Builder<Object> sseBuilder = ServerSentEvent.builder();
132130
StringBuilder mutableData = new StringBuilder();
@@ -151,7 +149,7 @@ else if (line.startsWith(":")) {
151149
}
152150
if (mutableData.length() > 0) {
153151
String data = mutableData.toString();
154-
sseBuilder.data(decodeData(data, dataType, hints));
152+
sseBuilder.data(decodeData(data, valueType, hints));
155153
}
156154
if (mutableComment.length() > 0) {
157155
String comment = mutableComment.toString();
@@ -169,19 +167,16 @@ private Object decodeData(String data, ResolvableType dataType, Map<String, Obje
169167
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
170168
Mono<DataBuffer> input = Mono.just(bufferFactory.wrap(bytes));
171169

172-
return this.dataDecoders.stream()
173-
.filter(e -> e.canDecode(dataType, MimeTypeUtils.APPLICATION_JSON))
174-
.findFirst()
175-
.orElseThrow(() -> new CodecException("No suitable decoder found!"))
176-
.decodeToMono(input, dataType, MimeTypeUtils.APPLICATION_JSON, hints)
170+
return this.decoder
171+
.decodeToMono(input, dataType, MediaType.TEXT_EVENT_STREAM, hints)
177172
.block(Duration.ZERO);
178173
}
179174

180175
@Override
181176
public Mono<Object> readMono(ResolvableType elementType, ReactiveHttpInputMessage message,
182177
Map<String, Object> hints) {
183178

184-
// Let's give StringDecoder a chance since SSE is ordered ahead of it
179+
// For single String give StringDecoder a chance which comes after SSE in the order
185180

186181
if (String.class.equals(elementType.getRawClass())) {
187182
Flux<DataBuffer> body = message.getBody();

spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java

Lines changed: 69 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,16 @@
1717
package org.springframework.http.codec;
1818

1919
import java.nio.charset.StandardCharsets;
20-
import java.util.ArrayList;
2120
import java.util.Collections;
2221
import java.util.HashMap;
2322
import java.util.List;
2423
import java.util.Map;
25-
import java.util.Optional;
2624

2725
import org.reactivestreams.Publisher;
2826
import reactor.core.publisher.Flux;
2927
import reactor.core.publisher.Mono;
3028

3129
import org.springframework.core.ResolvableType;
32-
import org.springframework.core.codec.CodecException;
3330
import org.springframework.core.codec.Encoder;
3431
import org.springframework.core.io.buffer.DataBuffer;
3532
import org.springframework.core.io.buffer.DataBufferFactory;
@@ -38,39 +35,37 @@
3835
import org.springframework.http.server.reactive.ServerHttpRequest;
3936
import org.springframework.http.server.reactive.ServerHttpResponse;
4037
import org.springframework.util.Assert;
41-
import org.springframework.util.MimeTypeUtils;
4238

4339
/**
44-
* Writer that supports a stream of {@link ServerSentEvent}s and also plain
45-
* {@link Object}s which is the same as an {@link ServerSentEvent} with data
46-
* only.
40+
* {@code ServerHttpMessageWriter} for {@code "text/event-stream"} responses.
4741
*
4842
* @author Sebastien Deleuze
4943
* @author Arjen Poutsma
44+
* @author Rossen Stoyanchev
5045
* @since 5.0
5146
*/
5247
public class ServerSentEventHttpMessageWriter implements ServerHttpMessageWriter<Object> {
5348

54-
/**
55-
* Server-Sent Events hint key expecting a {@link Boolean} value which when set to true
56-
* will adapt the content in order to comply with Server-Sent Events recommendation.
57-
* For example, it will append "data:" after each line break with data encoders
58-
* supporting it.
59-
* @see <a href="https://www.w3.org/TR/eventsource/">Server-Sent Events W3C recommendation</a>
60-
*/
61-
public static final String SSE_CONTENT_HINT = ServerSentEventHttpMessageWriter.class.getName() + ".sseContent";
49+
private static final List<MediaType> WRITABLE_MEDIA_TYPES =
50+
Collections.singletonList(MediaType.TEXT_EVENT_STREAM);
6251

6352

64-
private final List<Encoder<?>> dataEncoders;
53+
private final Encoder<?> encoder;
6554

6655

67-
public ServerSentEventHttpMessageWriter() {
68-
this.dataEncoders = Collections.emptyList();
56+
/**
57+
* Constructor with JSON {@code Encoder} for encoding objects. Support for
58+
* {@code String} event data is built-in.
59+
*/
60+
public ServerSentEventHttpMessageWriter(Encoder<?> encoder) {
61+
Assert.notNull(encoder, "'encoder' must not be null");
62+
this.encoder = encoder;
6963
}
7064

71-
public ServerSentEventHttpMessageWriter(List<Encoder<?>> dataEncoders) {
72-
Assert.notNull(dataEncoders, "'dataEncoders' must not be null");
73-
this.dataEncoders = new ArrayList<>(dataEncoders);
65+
66+
@Override
67+
public List<MediaType> getWritableMediaTypes() {
68+
return WRITABLE_MEDIA_TYPES;
7469
}
7570

7671

@@ -81,61 +76,35 @@ public boolean canWrite(ResolvableType elementType, MediaType mediaType) {
8176
}
8277

8378
@Override
84-
public List<MediaType> getWritableMediaTypes() {
85-
return Collections.singletonList(MediaType.TEXT_EVENT_STREAM);
86-
}
87-
88-
@Override
89-
public Mono<Void> write(Publisher<?> inputStream, ResolvableType elementType, MediaType mediaType,
79+
public Mono<Void> write(Publisher<?> input, ResolvableType elementType, MediaType mediaType,
9080
ReactiveHttpOutputMessage message, Map<String, Object> hints) {
9181

9282
message.getHeaders().setContentType(MediaType.TEXT_EVENT_STREAM);
83+
return message.writeAndFlushWith(encode(input, message.bufferFactory(), elementType, hints));
84+
}
9385

94-
DataBufferFactory bufferFactory = message.bufferFactory();
95-
Flux<Publisher<DataBuffer>> body = encode(inputStream, bufferFactory, elementType, hints);
86+
private Flux<Publisher<DataBuffer>> encode(Publisher<?> input, DataBufferFactory factory,
87+
ResolvableType elementType, Map<String, Object> hints) {
9688

97-
return message.writeAndFlushWith(body);
98-
}
89+
ResolvableType valueType = ServerSentEvent.class.isAssignableFrom(elementType.getRawClass()) ?
90+
elementType.getGeneric(0) : elementType;
9991

100-
private Flux<Publisher<DataBuffer>> encode(Publisher<?> inputStream, DataBufferFactory bufferFactory,
101-
ResolvableType type, Map<String, Object> hints) {
102-
103-
Map<String, Object> hintsWithSse = new HashMap<>(hints);
104-
hintsWithSse.put(SSE_CONTENT_HINT, true);
105-
return Flux.from(inputStream)
106-
.map(o -> toSseEvent(o, type))
107-
.map(sse -> {
108-
StringBuilder sb = new StringBuilder();
109-
sse.id().ifPresent(id -> writeField("id", id, sb));
110-
sse.event().ifPresent(event -> writeField("event", event, sb));
111-
sse.retry().ifPresent(retry -> writeField("retry", retry.toMillis(), sb));
112-
sse.comment().ifPresent(comment -> {
113-
comment = comment.replaceAll("\\n", "\n:");
114-
sb.append(':').append(comment).append("\n");
115-
});
116-
Flux<DataBuffer> dataBuffer = sse.data()
117-
.<Flux<DataBuffer>>map(data -> {
118-
sb.append("data:");
119-
if (data instanceof String) {
120-
String stringData = ((String) data).replaceAll("\\n", "\ndata:");
121-
sb.append(stringData).append('\n');
122-
return Flux.empty();
123-
}
124-
else {
125-
return applyEncoder(data, bufferFactory, hintsWithSse);
126-
}
127-
}).orElse(Flux.empty());
128-
129-
return Flux.concat(encodeString(sb.toString(), bufferFactory), dataBuffer,
130-
encodeString("\n", bufferFactory));
131-
});
92+
return Flux.from(input).map(element -> {
13293

133-
}
94+
ServerSentEvent<?> sse = element instanceof ServerSentEvent ?
95+
(ServerSentEvent<?>) element : ServerSentEvent.builder().data(element).build();
96+
97+
StringBuilder sb = new StringBuilder();
98+
sse.id().ifPresent(v -> writeField("id", v, sb));
99+
sse.event().ifPresent(v -> writeField("event", v, sb));
100+
sse.retry().ifPresent(v -> writeField("retry", v.toMillis(), sb));
101+
sse.comment().ifPresent(v -> sb.append(':').append(v.replaceAll("\\n", "\n:")).append("\n"));
102+
sse.data().ifPresent(v -> sb.append("data:"));
134103

135-
private ServerSentEvent<?> toSseEvent(Object data, ResolvableType type) {
136-
return ServerSentEvent.class.isAssignableFrom(type.getRawClass())
137-
? (ServerSentEvent<?>) data
138-
: ServerSentEvent.builder().data(data).build();
104+
return Flux.concat(encodeText(sb, factory),
105+
encodeData(sse, valueType, factory, hints),
106+
encodeText("\n", factory));
107+
});
139108
}
140109

141110
private void writeField(String fieldName, Object fieldValue, StringBuilder stringBuilder) {
@@ -146,40 +115,50 @@ private void writeField(String fieldName, Object fieldValue, StringBuilder strin
146115
}
147116

148117
@SuppressWarnings("unchecked")
149-
private <T> Flux<DataBuffer> applyEncoder(Object data, DataBufferFactory bufferFactory, Map<String, Object> hints) {
150-
ResolvableType elementType = ResolvableType.forClass(data.getClass());
151-
Optional<Encoder<?>> encoder = dataEncoders
152-
.stream()
153-
.filter(e -> e.canEncode(elementType, MimeTypeUtils.APPLICATION_JSON))
154-
.findFirst();
155-
return ((Encoder<T>) encoder.orElseThrow(() -> new CodecException("No suitable encoder found!")))
156-
.encode(Mono.just((T) data), bufferFactory, elementType, MimeTypeUtils.APPLICATION_JSON, hints)
157-
.concatWith(encodeString("\n", bufferFactory));
118+
private <T> Flux<DataBuffer> encodeData(ServerSentEvent<?> event, ResolvableType valueType,
119+
DataBufferFactory factory, Map<String, Object> hints) {
120+
121+
Object data = event.data().orElse(null);
122+
if (data == null) {
123+
return Flux.empty();
124+
}
125+
126+
if (data instanceof String) {
127+
String text = (String) data;
128+
return Flux.from(encodeText(text.replaceAll("\\n", "\ndata:") + "\n", factory));
129+
}
130+
131+
return ((Encoder<T>) this.encoder)
132+
.encode(Mono.just((T) data), factory, valueType, MediaType.TEXT_EVENT_STREAM, hints)
133+
.concatWith(encodeText("\n", factory));
158134
}
159135

160-
private Mono<DataBuffer> encodeString(String str, DataBufferFactory bufferFactory) {
161-
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
136+
private Mono<DataBuffer> encodeText(CharSequence text, DataBufferFactory bufferFactory) {
137+
byte[] bytes = text.toString().getBytes(StandardCharsets.UTF_8);
162138
DataBuffer buffer = bufferFactory.allocateBuffer(bytes.length).write(bytes);
163139
return Mono.just(buffer);
164140
}
165141

166142
@Override
167-
public Mono<Void> write(Publisher<?> inputStream, ResolvableType actualType, ResolvableType elementType,
143+
public Mono<Void> write(Publisher<?> input, ResolvableType actualType, ResolvableType elementType,
168144
MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response,
169145
Map<String, Object> hints) {
170146

171-
Map<String, Object> allHints = this.dataEncoders.stream()
172-
.filter(encoder -> encoder instanceof ServerHttpEncoder)
173-
.map(encoder -> (ServerHttpEncoder<?>) encoder)
174-
.map(encoder -> encoder.getEncodeHints(actualType, elementType, mediaType, request, response))
175-
.reduce(new HashMap<>(), (t, u) -> {
176-
t.putAll(u);
177-
return t;
178-
});
179-
147+
Map<String, Object> allHints = new HashMap<>();
148+
allHints.putAll(getEncodeHints(actualType, elementType, mediaType, request, response));
180149
allHints.putAll(hints);
181150

182-
return write(inputStream, elementType, mediaType, response, allHints);
151+
return write(input, elementType, mediaType, response, allHints);
152+
}
153+
154+
private Map<String, Object> getEncodeHints(ResolvableType actualType, ResolvableType elementType,
155+
MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response) {
156+
157+
if (this.encoder instanceof ServerHttpEncoder) {
158+
ServerHttpEncoder<?> httpEncoder = (ServerHttpEncoder<?>) this.encoder;
159+
return httpEncoder.getEncodeHints(actualType, elementType, mediaType, request, response);
160+
}
161+
return Collections.emptyMap();
183162
}
184163

185164
}

0 commit comments

Comments
 (0)