Skip to content

Add support for Jetty Reactive Streams HTTP client #1783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

sdeleuze
Copy link
Contributor

Ready to be reviewed before merging in master since I would like to begin having feedback with real applications using Spring Framework snapshots.

Notice that this PR is implemented with buffer copy instead of optimized buffer wrapping because the latter hangs since Callback#succeeded doesn't allow releasing the buffer and requesting more data at different times (required for Mono<DataBuffer> for example).

Simone is aware of that issue and will see if it can be changed on Jetty side (not sure yet). That could be changed with an additional commit if that becomes possible in the future.

Issue: SPR-15092

@@ -36,6 +36,7 @@ dependencies {
optional("io.netty:netty-all")
optional("io.projectreactor.ipc:reactor-netty")
optional("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
optional("org.eclipse.jetty:jetty-reactive-httpclient:0.9.2")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version 1.0.0 with RS TCK implemented is available in Maven Central.

Copy link
Contributor Author

@sdeleuze sdeleuze Apr 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated thanks

@sdeleuze sdeleuze force-pushed the SPR-15092 branch 4 times, most recently from 08ae9e3 to f12fbf1 Compare April 11, 2018 08:42
Copy link
Contributor

@poutsma poutsma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Some things that need to change and some things I need more info on during our meeting.

* @author Sebastien Deleuze
* @since 5.1
*/
public class JettyClientHttpConnector implements ClientHttpConnector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class should implement Spring's SmartLifecycle instead of lazily starting the HttpClient in connect. This way, we make sure the started is started on app context refresh. We should also call httpClient.stop() as part of implementing Lifecycle, because it will clean up the resources that are currently leaking.

public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {

if (!httpClient.isStarted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I suggest to implement SmartLifecycle above, I think we should keep this check. Not everybody will configure their WebClient in an app context, so the client might not yet be started when connect is called.


private final Request jettyRequest;

private final DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the data buffer factory should be exposed via a setter in the JettyClientHttpConnector, and passed along in the constructor. That way, people can configure data buffer usage (i.e. direct vs heap-based, etc.)

String contentType = jettyRequest.getHeaders().contains(HttpHeader.CONTENT_TYPE) ?
jettyRequest.getHeaders().getField(HttpHeader.CONTENT_TYPE).getValue():
MediaType.APPLICATION_OCTET_STREAM_VALUE;
Publisher<ContentChunk> chunks = Flux.from(publisher).map(DataBuffer::asByteBuffer).map(ContentChunk::new);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though this class is using DefaultDataBuffer, which does not require releasing, I still think it would be a good idea to release them here anyway, especially if we make the data buffer factory configurable.

I think the release can be implemented as the ContentChunk callback (I am assuming that the callback would be called after the data has been written, not sure about that). I see that below you said that releasing in the callback does not work for the response, but perhaps it does for the request?

ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(Flux
.from(body)
.flatMap(publisher -> publisher)
.map(buffer -> new ContentChunk(buffer.asByteBuffer())), contentType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Release needs to happen here as well.

@Override
public HttpMethod getMethod() {
HttpMethod method = HttpMethod.resolve(this.jettyRequest.getMethod());
Assert.notNull(method, "Method must not be null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like Assert.state is more suitable here, because notNull results in an IllegalArgumentException.

* @author Sebastien Deleuze
* @since 5.1
*/
public class JettyClientHttpRequest extends AbstractClientHttpRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be package-protected instead, no need to expose this class.

* @author Sebastien Deleuze
* @since 5.1
*/
public class JettyClientHttpResponse implements ClientHttpResponse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be package-protected instead, no need to expose it.

private final Flux<DataBuffer> content;


public JettyClientHttpResponse(ReactiveResponse reactiveResponse, Publisher<DataBuffer> content) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Publisher<DataBuffer> parameter should be Flux<DataBuffer>, since the only place you're creating them is internally, from JettyClientHttpRequest.

Assert.notNull(reactiveResponse, "reactiveResponse should not be null");
Assert.notNull(content, "content should not be null");
this.reactiveResponse = reactiveResponse;
this.content = Flux.from(content);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above, drop the Flux.from.

@sdeleuze sdeleuze force-pushed the SPR-15092 branch 2 times, most recently from 8cae9a1 to 82481a9 Compare April 12, 2018 12:19
@sdeleuze
Copy link
Contributor Author

I have taken in account your feedback @poutsma thanks.

public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {

if (!httpClient.isStarted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really minor, but perhaps delegate to isRunning()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, isStarted() is different than isRunning().

public int getPhase() {
return Integer.MAX_VALUE;
}
}
Copy link
Contributor

@rstoyanchev rstoyanchev Jul 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ordering of SmartLifecycle methods, in the very least:

  • stop(Runnable) should be after stop()
  • getPhase and isAutoStartup up ahead among accessors

As lifecycle methods, personally I like to see start/stop/isRunning ahead of runtime methods like connect, and it's something I've been following in a number of classes, but I'm not sure if there is a broader precedent.

@Override
public void start() {
try {
this.httpClient.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A code level comment that explains HttpClient is internally synchronized and protected with state checks around here could be useful.


if (!uri.isAbsolute()) {
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No practical difference but maybe this check first (before starting the client above) ?

headers.entrySet().forEach(e -> e.getValue().forEach(v -> this.jettyRequest.header(e.getKey(), v)));
if (!headers.containsKey(HttpHeaders.ACCEPT)) {
this.jettyRequest.header(HttpHeaders.ACCEPT, "*/*");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to add "*/*" explicitly? It should be the same as not having it.

public Mono<ClientHttpResponse> getResponse() {
return this.response;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall method ordering should be: accessors, public contract methods, protected contract (base class) methods (e.g. see ReactorHttpRequest).

public Mono<Void> writeWith(Publisher<? extends DataBuffer> publisher) {
String contentType = jettyRequest.getHeaders().contains(HttpHeader.CONTENT_TYPE) ?
jettyRequest.getHeaders().getField(HttpHeader.CONTENT_TYPE).getValue():
MediaType.APPLICATION_OCTET_STREAM_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not enough to look up via getHeaders().getContentType() ?

DataBufferUtils.release(buffer);
throw Exceptions.propagate(x);
}
}));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be extracted:

private ContentChunk toContentChunk(DataBuffer buffer) {
	return new ContentChunk(buffer.asByteBuffer(), new Callback() {
		@Override
		public void succeeded() {
			DataBufferUtils.release(buffer);
		}

		@Override
		public void failed(Throwable x) {
			DataBufferUtils.release(buffer);
			throw Exceptions.propagate(x);
		}
	});
}

Then it becomes:

Publisher<ContentChunk> chunks = Flux.from(publisher).map(this::toContentChunk);

And the writeAndFlushWith method below does the same AFAICS.


public Mono<ClientHttpResponse> getResponse() {
return this.response;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious at first why a request object should have a method that returns the response, and a public method for that. It must be called at the right time too, or else the response field may not have been initialized, but I can see why the method is here, based on the way the Jetty API works.

I think it would help clarity if the method called completes simply stored ReactiveRequest, returned Mono.empty(), and nothing more. The method already returns Mono.empty() anyway, so the part about creating the response is completely independent and could be moved out of the request. In other words, the connector would access the ReactiveRequest (via package private method) and then create the response. Then would make it a little more instantly obvious what's going on.

The other point is that the Mono<Void> from writeWith is supposed to reflect the completion of the writing of the request body, per contract, but probably not a practical issue at this time. If we keep all this package private we have the option to evolve it in the future.

@rstoyanchev
Copy link
Contributor

Sorry for the delayed response, but my comments above. Let's get this ready for master, and continue there as needed.

@sdeleuze sdeleuze force-pushed the SPR-15092 branch 2 times, most recently from 151e245 to de8c1e2 Compare July 7, 2018 16:28
Implemented with buffer copy instead of optimized buffer wrapping
because the latter hangs since Callback#succeeded doesn't allow
releasing the buffer and requesting more data at different times
(required for Mono<DataBuffer> for example).
See jetty/jetty.project#2429.

Issue: SPR-15092
@sdeleuze
Copy link
Contributor Author

sdeleuze commented Jul 7, 2018

@rstoyanchev Thanks for your feedback, I think I have taken your remarks in account. I have not yet merged it in master because I have a strange remaining issue: SseIntegrationTests and WebClientIntegrationTests fail on command line with ./gradlew clean test (AssertionError, likely timeouts) but succeed in IntelliJ IDEA. Any idea?

@sdeleuze
Copy link
Contributor Author

Merged via a87764f.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants