Skip to content

Generate SseEmitter for non-reactive Spring SSE endpoints #20675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generators/java-camel.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ These options may be applied as additional-properties (cli) or configOptions (pl
|scmDeveloperConnection|SCM developer connection in generated pom.xml| |scm:git:git@github.com:openapitools/openapi-generator.git|
|scmUrl|SCM URL in generated pom.xml| |https://github.com/openapitools/openapi-generator|
|serializableModel|boolean - toggle "implements Serializable" for generated models| |false|
|serverSentEvents|enable server sent events support| |false|
|singleContentTypes|Whether to select only one produces/consumes content-type by operation.| |false|
|skipDefaultInterface|Whether to skip generation of default implementations for java8 interfaces| |false|
|snapshotVersion|Uses a SNAPSHOT version.|<dl><dt>**true**</dt><dd>Use a SnapShot Version</dd><dt>**false**</dt><dd>Use a Release Version</dd></dl>|null|
Expand Down
1 change: 1 addition & 0 deletions docs/generators/spring.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ These options may be applied as additional-properties (cli) or configOptions (pl
|scmDeveloperConnection|SCM developer connection in generated pom.xml| |scm:git:git@github.com:openapitools/openapi-generator.git|
|scmUrl|SCM URL in generated pom.xml| |https://github.com/openapitools/openapi-generator|
|serializableModel|boolean - toggle &quot;implements Serializable&quot; for generated models| |false|
|serverSentEvents|enable server sent events support| |false|
|singleContentTypes|Whether to select only one produces/consumes content-type by operation.| |false|
|skipDefaultInterface|Whether to skip generation of default implementations for java8 interfaces| |false|
|snapshotVersion|Uses a SNAPSHOT version.|<dl><dt>**true**</dt><dd>Use a SnapShot Version</dd><dt>**false**</dt><dd>Use a Release Version</dd></dl>|null|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public SpringCodegen() {
cliOptions.add(CliOption.newBoolean(ASYNC, "use async Callable controllers", async));
cliOptions.add(CliOption.newBoolean(REACTIVE, "wrap responses in Mono/Flux Reactor types (spring-boot only)",
reactive));
cliOptions.add(CliOption.newBoolean(SSE, "enable server sent events support", sse));
cliOptions.add(new CliOption(RESPONSE_WRAPPER,
"wrap the responses in given type (Future, Callable, CompletableFuture,ListenableFuture, DeferredResult, RxObservable, RxSingle or fully qualified type)"));
cliOptions.add(CliOption.newBoolean(VIRTUAL_SERVICE,
Expand Down Expand Up @@ -406,9 +407,11 @@ public void processOpts() {
convertPropertyToBooleanAndWriteBack(ASYNC, this::setAsync);
if (additionalProperties.containsKey(REACTIVE)) {
if (SPRING_CLOUD_LIBRARY.equals(library)) {
throw new IllegalArgumentException("Currently, reactive option doesn't supported by Spring Cloud");
throw new IllegalArgumentException("Currently, reactive option isn't supported by Spring Cloud");
}
convertPropertyToBooleanAndWriteBack(REACTIVE, this::setReactive);
}
if (additionalProperties.containsKey(SSE)) {
convertPropertyToBooleanAndWriteBack(SSE, this::setSse);
}

Expand Down Expand Up @@ -1026,53 +1029,59 @@ public CodegenOperation fromOperation(String path, String httpMethod, Operation
codegenOperation.imports.add("ApiIgnore");
}
if (sse) {
var MEDIA_EVENT_STREAM = "text/event-stream";
// inspecting used streaming media types
/*
expected definition:
content:
text/event-stream:
schema:
type: array
format: event-stream
items:
type: <type> or
$ref: <typeRef>
*/
Map<String, List<Schema>> schemaTypes = operation.getResponses().entrySet().stream()
.map(e -> Pair.of(e.getValue(), fromResponse(e.getKey(), e.getValue())))
.filter(p -> p.getRight().is2xx) // consider only success
.map(p -> p.getLeft().getContent().get(MEDIA_EVENT_STREAM))
.map(MediaType::getSchema)
.collect(Collectors.toList()).stream()
.collect(Collectors.groupingBy(Schema::getType));
if(schemaTypes.containsKey("array")) {
// we have a match with SSE pattern
// double check potential conflicting, multiple specs
if(schemaTypes.keySet().size() > 1) {
throw new RuntimeException("only 1 response media type supported, when SSE is detected");
}
// double check schema format
List<Schema> eventTypes = schemaTypes.get("array");
if( eventTypes.stream().anyMatch(schema -> !"event-stream".equalsIgnoreCase(schema.getFormat()))) {
throw new RuntimeException("schema format 'event-stream' is required, when SSE is detected");
}
// double check item types
Set<String> itemTypes = eventTypes.stream()
.map(schema -> schema.getItems().getType() != null
? schema.getItems().getType()
: schema.getItems().get$ref())
.collect(Collectors.toSet());
if(itemTypes.size() > 1) {
throw new RuntimeException("only single item type is supported, when SSE is detected");
}
codegenOperation.vendorExtensions.put("x-sse", true);
} // Not an SSE compliant definition
handleSseConfiguration(operation, codegenOperation);
}
} else if (sse) {
handleSseConfiguration(operation, codegenOperation);
}

return codegenOperation;
}

// inspecting used streaming media types
/*
expected definition:
content:
text/event-stream:
schema:
type: array
format: event-stream
items:
type: <type> or
$ref: <typeRef>
*/
private void handleSseConfiguration(Operation operation, CodegenOperation codegenOperation) {
Map<String, List<Schema>> schemaTypes = operation.getResponses().entrySet().stream()
.map(e -> Pair.of(e.getValue(), fromResponse(e.getKey(), e.getValue())))
.filter(p -> p.getRight().is2xx) // consider only success
.map(p -> p.getLeft().getContent().get("text/event-stream"))
.map(MediaType::getSchema)
.collect(Collectors.toList()).stream()
.collect(Collectors.groupingBy(Schema::getType));
if (schemaTypes.containsKey("array")) {
// we have a match with SSE pattern
// double check potential conflicting, multiple specs
if (schemaTypes.keySet().size() > 1) {
throw new RuntimeException("only 1 response media type supported, when SSE is detected");
}
// double check schema format
List<Schema> eventTypes = schemaTypes.get("array");
if (eventTypes.stream().anyMatch(schema -> !"event-stream".equalsIgnoreCase(schema.getFormat()))) {
throw new RuntimeException("schema format 'event-stream' is required, when SSE is detected");
}
// double check item types
Set<String> itemTypes = eventTypes.stream()
.map(schema -> schema.getItems().getType() != null
? schema.getItems().getType()
: schema.getItems().get$ref())
.collect(Collectors.toSet());
if (itemTypes.size() > 1) {
throw new RuntimeException("only single item type is supported, when SSE is detected");
}
codegenOperation.vendorExtensions.put("x-sse", true);
}
}

private Set<String> reformatProvideArgsParams(Operation operation) {
Set<String> provideArgsClassSet = new HashSet<>();
Object argObj = operation.getExtensions().get("x-spring-provide-args");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ import org.springframework.web.context.request.NativeWebRequest;
{{/reactive}}
{{/jdk8-no-delegate}}
import org.springframework.web.multipart.MultipartFile;
{{^reactive}}
{{#serverSentEvents}}
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
{{/serverSentEvents}}
{{/reactive}}
{{#reactive}}
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -246,7 +251,9 @@ public interface {{classname}} {
{{#vendorExtensions.x-operation-extra-annotation}}
{{{.}}}
{{/vendorExtensions.x-operation-extra-annotation}}
{{#vendorExtensions.x-sse}}@ResponseBody{{/vendorExtensions.x-sse}}
{{#reactive}}{{#vendorExtensions.x-sse}}
@ResponseBody
{{/vendorExtensions.x-sse}}{{/reactive}}
{{#jdk8-default-interface}}default {{/jdk8-default-interface}}{{>responseType}} {{#delegate-method}}_{{/delegate-method}}{{operationId}}(
{{#allParams}}{{>queryParams}}{{>pathParams}}{{>headerParams}}{{>bodyParams}}{{>formParams}}{{>cookieParams}}{{^-last}},
{{/-last}}{{/allParams}}{{#reactive}}{{#hasParams}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import org.springframework.http.ResponseEntity;
{{/useResponseEntity}}
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.multipart.MultipartFile;
{{^reactive}}
{{#serverSentEvents}}
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
{{/serverSentEvents}}
{{/reactive}}
{{#reactive}}
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{{^reactive}}
{{#examples}}
{{^vendorExtensions.x-sse}}
{{#-first}}
{{#async}}
return CompletableFuture.supplyAsync(()-> {
Expand All @@ -22,12 +23,27 @@ return CompletableFuture.supplyAsync(()-> {
}, Runnable::run);
{{/async}}
{{/-last}}
{{/vendorExtensions.x-sse}}
{{#vendorExtensions.x-sse}}
{{#useResponseEntity}}return new SseEmitter<>();
{{/useResponseEntity}}
{{^useResponseEntity}}throw new IllegalArgumentException("Not implemented");
{{/useResponseEntity}}
{{/vendorExtensions.x-sse}}
{{/examples}}
{{^examples}}
{{^vendorExtensions.x-sse}}
{{#useResponseEntity}}return {{#async}}CompletableFuture.completedFuture({{/async}}new ResponseEntity<>({{#returnSuccessCode}}HttpStatus.OK{{/returnSuccessCode}}{{^returnSuccessCode}}HttpStatus.NOT_IMPLEMENTED{{/returnSuccessCode}}){{#async}}){{/async}};
{{/useResponseEntity}}
{{^useResponseEntity}}throw new IllegalArgumentException("Not implemented");
{{/useResponseEntity}}
{{/vendorExtensions.x-sse}}
{{#vendorExtensions.x-sse}}
{{#useResponseEntity}}return new SseEmitter<>();
{{/useResponseEntity}}
{{^useResponseEntity}}throw new IllegalArgumentException("Not implemented");
{{/useResponseEntity}}
{{/vendorExtensions.x-sse}}
{{/examples}}
{{/reactive}}
{{#reactive}}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{{^vendorExtensions.x-sse}}{{#reactive}}{{#useResponseEntity}}Mono<ResponseEntity<{{#isArray}}Flux<{{/isArray}}{{>returnTypes}}{{#isArray}}>{{/isArray}}>>{{/useResponseEntity}}{{^useResponseEntity}}{{#isArray}}Flux{{/isArray}}{{^isArray}}Mono{{/isArray}}<{{>returnTypes}}>{{/useResponseEntity}}{{/reactive}}{{^reactive}}{{#responseWrapper}}{{.}}<{{/responseWrapper}}{{#useResponseEntity}}ResponseEntity<{{/useResponseEntity}}{{>returnTypes}}{{#useResponseEntity}}>{{/useResponseEntity}}{{#responseWrapper}}>{{/responseWrapper}}{{/reactive}}{{/vendorExtensions.x-sse}}{{#vendorExtensions.x-sse}}{{#isArray}}Flux{{/isArray}}{{^isArray}}Mono{{/isArray}}<{{>returnTypes}}>{{/vendorExtensions.x-sse}}
{{^vendorExtensions.x-sse}}{{#reactive}}{{#useResponseEntity}}Mono<ResponseEntity<{{#isArray}}Flux<{{/isArray}}{{>returnTypes}}{{#isArray}}>{{/isArray}}>>{{/useResponseEntity}}{{^useResponseEntity}}{{#isArray}}Flux{{/isArray}}{{^isArray}}Mono{{/isArray}}<{{>returnTypes}}>{{/useResponseEntity}}{{/reactive}}{{^reactive}}{{#responseWrapper}}{{.}}<{{/responseWrapper}}{{#useResponseEntity}}ResponseEntity<{{/useResponseEntity}}{{>returnTypes}}{{#useResponseEntity}}>{{/useResponseEntity}}{{#responseWrapper}}>{{/responseWrapper}}{{/reactive}}{{/vendorExtensions.x-sse}}{{#vendorExtensions.x-sse}}{{#reactive}}{{#isArray}}Flux{{/isArray}}{{^isArray}}Mono{{/isArray}}<{{>returnTypes}}>{{/reactive}}{{^reactive}}SseEmitter{{/reactive}}{{/vendorExtensions.x-sse}}
Original file line number Diff line number Diff line change
Expand Up @@ -4475,8 +4475,7 @@ public void multiLineTagDescription() throws IOException {
}

@Test
public void testSSEOperationSupport() throws Exception {

public void testSSEOperationSupportReactive() throws Exception {
File output = Files.createTempDirectory("test").toFile().getCanonicalFile();
output.deleteOnExit();

Expand Down Expand Up @@ -4532,7 +4531,66 @@ public void testSSEOperationSupport() throws Exception {
.assertMethod("nonSSE", "ServerWebExchange")
.isNotNull()
.hasReturnType("Mono<ResponseEntity<String>>")
.bodyContainsLines("return result.then(Mono.empty());")
.bodyContainsLines("return result.then(Mono.empty());");
}

@Test
public void testSSEOperationSupportBlocking() throws Exception {
File output = Files.createTempDirectory("test").toFile().getCanonicalFile();
output.deleteOnExit();

final OpenAPI openAPI = TestUtils.parseFlattenSpec("src/test/resources/3_0/sse.yaml");
final SpringCodegen codegen = new SpringCodegen();
codegen.setOpenAPI(openAPI);
codegen.setOutputDir(output.getAbsolutePath());

codegen.additionalProperties().put(SSE, "true");
codegen.additionalProperties().put(INTERFACE_ONLY, "false");
codegen.additionalProperties().put(DELEGATE_PATTERN, "true");

ClientOptInput input = new ClientOptInput();
input.openAPI(openAPI);
input.config(codegen);

DefaultGenerator generator = new DefaultGenerator();
generator.setGeneratorPropertyDefault(CodegenConstants.APIS, "true");
generator.setGenerateMetadata(false);

Map<String, File> files = generator.opts(input).generate().stream()
.collect(Collectors.toMap(File::getName, Function.identity()));

MapAssert.assertThatMap(files).isNotEmpty();
File api = files.get("PathApi.java");
File delegate = files.get("PathApiDelegate.java");

JavaFileAssert.assertThat(api)
.assertMethod("sseVariant1")
.isNotNull()
.hasReturnType("SseEmitter")
.toFileAssert()
.assertMethod("sseVariant2")
.isNotNull()
.hasReturnType("SseEmitter")
.toFileAssert()
.assertMethod("nonSSE")
.isNotNull()
.hasReturnType("ResponseEntity<String>");

JavaFileAssert.assertThat(delegate)
.assertMethod("sseVariant1")
.isNotNull()
.hasReturnType("SseEmitter")
.bodyContainsLines("return new SseEmitter<>();")
.toFileAssert()
.assertMethod("sseVariant2")
.isNotNull()
.hasReturnType("SseEmitter")
.bodyContainsLines("return new SseEmitter<>();")
.toFileAssert()
.assertMethod("nonSSE")
.isNotNull()
.hasReturnType("ResponseEntity<String>")
.bodyContainsLines("return new ResponseEntity<>(HttpStatus.NOT_IMPLEMENTED);")
;

}
Expand Down
Loading