-
Notifications
You must be signed in to change notification settings - Fork 38.5k
Add support for Jetty Reactive Streams HTTP client #1783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
spring-web/spring-web.gradle
Outdated
@@ -36,6 +36,7 @@ dependencies { | |||
optional("io.netty:netty-all") | |||
optional("io.projectreactor.ipc:reactor-netty") | |||
optional("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") | |||
optional("org.eclipse.jetty:jetty-reactive-httpclient:0.9.2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Version 1.0.0 with RS TCK implemented is available in Maven Central.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated thanks
08ae9e3
to
f12fbf1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Some things that need to change and some things I need more info on during our meeting.
* @author Sebastien Deleuze | ||
* @since 5.1 | ||
*/ | ||
public class JettyClientHttpConnector implements ClientHttpConnector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this class should implement Spring's SmartLifecycle
instead of lazily starting the HttpClient
in connect
. This way, we make sure the started is started on app context refresh. We should also call httpClient.stop()
as part of implementing Lifecycle
, because it will clean up the resources that are currently leaking.
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri, | ||
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) { | ||
|
||
if (!httpClient.isStarted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though I suggest to implement SmartLifecycle
above, I think we should keep this check. Not everybody will configure their WebClient in an app context, so the client might not yet be started when connect
is called.
|
||
private final Request jettyRequest; | ||
|
||
private final DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the data buffer factory should be exposed via a setter in the JettyClientHttpConnector
, and passed along in the constructor. That way, people can configure data buffer usage (i.e. direct vs heap-based, etc.)
String contentType = jettyRequest.getHeaders().contains(HttpHeader.CONTENT_TYPE) ? | ||
jettyRequest.getHeaders().getField(HttpHeader.CONTENT_TYPE).getValue(): | ||
MediaType.APPLICATION_OCTET_STREAM_VALUE; | ||
Publisher<ContentChunk> chunks = Flux.from(publisher).map(DataBuffer::asByteBuffer).map(ContentChunk::new); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though this class is using DefaultDataBuffer
, which does not require releasing, I still think it would be a good idea to release them here anyway, especially if we make the data buffer factory configurable.
I think the release can be implemented as the ContentChunk
callback (I am assuming that the callback would be called after the data has been written, not sure about that). I see that below you said that releasing in the callback does not work for the response, but perhaps it does for the request?
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(Flux | ||
.from(body) | ||
.flatMap(publisher -> publisher) | ||
.map(buffer -> new ContentChunk(buffer.asByteBuffer())), contentType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Release needs to happen here as well.
@Override | ||
public HttpMethod getMethod() { | ||
HttpMethod method = HttpMethod.resolve(this.jettyRequest.getMethod()); | ||
Assert.notNull(method, "Method must not be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like Assert.state
is more suitable here, because notNull
results in an IllegalArgumentException
.
* @author Sebastien Deleuze | ||
* @since 5.1 | ||
*/ | ||
public class JettyClientHttpRequest extends AbstractClientHttpRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be package-protected instead, no need to expose this class.
* @author Sebastien Deleuze | ||
* @since 5.1 | ||
*/ | ||
public class JettyClientHttpResponse implements ClientHttpResponse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be package-protected instead, no need to expose it.
private final Flux<DataBuffer> content; | ||
|
||
|
||
public JettyClientHttpResponse(ReactiveResponse reactiveResponse, Publisher<DataBuffer> content) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Publisher<DataBuffer>
parameter should be Flux<DataBuffer>
, since the only place you're creating them is internally, from JettyClientHttpRequest
.
Assert.notNull(reactiveResponse, "reactiveResponse should not be null"); | ||
Assert.notNull(content, "content should not be null"); | ||
this.reactiveResponse = reactiveResponse; | ||
this.content = Flux.from(content); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above, drop the Flux.from
.
8cae9a1
to
82481a9
Compare
I have taken in account your feedback @poutsma thanks. |
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri, | ||
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) { | ||
|
||
if (!httpClient.isStarted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really minor, but perhaps delegate to isRunning()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, isStarted()
is different than isRunning()
.
public int getPhase() { | ||
return Integer.MAX_VALUE; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ordering of SmartLifecycle
methods, in the very least:
- stop(Runnable) should be after stop()
- getPhase and isAutoStartup up ahead among accessors
As lifecycle methods, personally I like to see start/stop/isRunning ahead of runtime methods like connect
, and it's something I've been following in a number of classes, but I'm not sure if there is a broader precedent.
@Override | ||
public void start() { | ||
try { | ||
this.httpClient.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A code level comment that explains HttpClient is internally synchronized and protected with state checks around here could be useful.
|
||
if (!uri.isAbsolute()) { | ||
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No practical difference but maybe this check first (before starting the client above) ?
headers.entrySet().forEach(e -> e.getValue().forEach(v -> this.jettyRequest.header(e.getKey(), v))); | ||
if (!headers.containsKey(HttpHeaders.ACCEPT)) { | ||
this.jettyRequest.header(HttpHeaders.ACCEPT, "*/*"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to add "*/*"
explicitly? It should be the same as not having it.
public Mono<ClientHttpResponse> getResponse() { | ||
return this.response; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall method ordering should be: accessors, public contract methods, protected contract (base class) methods (e.g. see ReactorHttpRequest
).
public Mono<Void> writeWith(Publisher<? extends DataBuffer> publisher) { | ||
String contentType = jettyRequest.getHeaders().contains(HttpHeader.CONTENT_TYPE) ? | ||
jettyRequest.getHeaders().getField(HttpHeader.CONTENT_TYPE).getValue(): | ||
MediaType.APPLICATION_OCTET_STREAM_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not enough to look up via getHeaders().getContentType()
?
DataBufferUtils.release(buffer); | ||
throw Exceptions.propagate(x); | ||
} | ||
})); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be extracted:
private ContentChunk toContentChunk(DataBuffer buffer) {
return new ContentChunk(buffer.asByteBuffer(), new Callback() {
@Override
public void succeeded() {
DataBufferUtils.release(buffer);
}
@Override
public void failed(Throwable x) {
DataBufferUtils.release(buffer);
throw Exceptions.propagate(x);
}
});
}
Then it becomes:
Publisher<ContentChunk> chunks = Flux.from(publisher).map(this::toContentChunk);
And the writeAndFlushWith method below does the same AFAICS.
|
||
public Mono<ClientHttpResponse> getResponse() { | ||
return this.response; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not obvious at first why a request object should have a method that returns the response, and a public method for that. It must be called at the right time too, or else the response field may not have been initialized, but I can see why the method is here, based on the way the Jetty API works.
I think it would help clarity if the method called completes
simply stored ReactiveRequest
, returned Mono.empty()
, and nothing more. The method already returns Mono.empty()
anyway, so the part about creating the response is completely independent and could be moved out of the request. In other words, the connector would access the ReactiveRequest
(via package private method) and then create the response. Then would make it a little more instantly obvious what's going on.
The other point is that the Mono<Void>
from writeWith is supposed to reflect the completion of the writing of the request body, per contract, but probably not a practical issue at this time. If we keep all this package private we have the option to evolve it in the future.
Sorry for the delayed response, but my comments above. Let's get this ready for master, and continue there as needed. |
151e245
to
de8c1e2
Compare
Implemented with buffer copy instead of optimized buffer wrapping because the latter hangs since Callback#succeeded doesn't allow releasing the buffer and requesting more data at different times (required for Mono<DataBuffer> for example). See jetty/jetty.project#2429. Issue: SPR-15092
@rstoyanchev Thanks for your feedback, I think I have taken your remarks in account. I have not yet merged it in |
Merged via a87764f. |
Ready to be reviewed before merging in
master
since I would like to begin having feedback with real applications using Spring Framework snapshots.Notice that this PR is implemented with buffer copy instead of optimized buffer wrapping because the latter hangs since
Callback#succeeded
doesn't allow releasing the buffer and requesting more data at different times (required forMono<DataBuffer>
for example).Simone is aware of that issue and will see if it can be changed on Jetty side (not sure yet). That could be changed with an additional commit if that becomes possible in the future.
Issue: SPR-15092