Skip to content

WIP: Update to the new Reactor Netty API #1831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spring-messaging/spring-messaging.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies {
compile(project(":spring-core"))
optional(project(":spring-context"))
optional(project(":spring-oxm"))
optional("io.projectreactor.ipc:reactor-netty")
optional("io.projectreactor.ipc:reactor-netty:0.8.0.BUILD-SNAPSHOT")
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet-api"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.ipc.netty.Connection;
import reactor.ipc.netty.FutureMono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.resources.LoopResources;
import reactor.ipc.netty.resources.PoolResources;
import reactor.ipc.netty.tcp.TcpClient;
Expand Down Expand Up @@ -98,57 +97,18 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
this(builder -> builder.host(host).port(port), codec);
}

/**
* Constructor with a {@link ClientOptions.Builder} that can be used to
* customize Reactor Netty client options.
*
* <p><strong>Note: </strong> this constructor manages the lifecycle of the
* {@link TcpClient} and its underlying resources. Please do not customize
* any of the following options:
* {@link ClientOptions.Builder#channelGroup(ChannelGroup) ChannelGroup},
* {@link ClientOptions.Builder#loopResources(LoopResources) LoopResources}, and
* {@link ClientOptions.Builder#poolResources(PoolResources) PoolResources}.
* You may set the {@link ClientOptions.Builder#disablePool() disablePool}
* option if you simply want to turn off pooling.
*
* <p>For full control over the initialization and lifecycle of the TcpClient,
* see {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}.
*
* @param optionsConsumer consumer to customize client options
* @param codec the code to use
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(Consumer<ClientOptions.Builder<?>> optionsConsumer,
ReactorNettyCodec<P> codec) {

Assert.notNull(optionsConsumer, "Consumer<ClientOptions.Builder<?> is required");
Assert.notNull(host, "host is required");
Assert.notNull(port, "port is required");
Assert.notNull(codec, "ReactorNettyCodec is required");

this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

Consumer<ClientOptions.Builder<?>> builtInConsumer = builder -> {

Assert.isTrue(!builder.isLoopAvailable() && !builder.isPoolAvailable(),
"The provided ClientOptions.Builder contains LoopResources and/or PoolResources. " +
"Please, use the constructor that accepts a TcpClient instance " +
"for full control over initialization and lifecycle.");

builder.channelGroup(this.channelGroup);
builder.preferNative(false);

this.loopResources = LoopResources.create("tcp-client-loop");
builder.loopResources(this.loopResources);

if (!builder.isPoolDisabled()) {
this.poolResources = PoolResources.elastic("tcp-client-pool");
builder.poolResources(this.poolResources);
}
};

this.tcpClient = TcpClient.create(optionsConsumer.andThen(builtInConsumer));
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = PoolResources.elastic("tcp-client-pool");
this.tcpClient = TcpClient.create(poolResources)
.host(host)
.port(port)
.runOn(loopResources, false)
.doOnConnected(c -> channelGroup.add(c.channel()));
this.codec = codec;
}

Expand Down Expand Up @@ -181,7 +141,8 @@ public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
}

Mono<Void> connectMono = this.tcpClient
.newHandler(new ReactorNettyHandler(handler))
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnError(handler::afterConnectFailure)
.then();

Expand All @@ -201,11 +162,12 @@ public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, Reconnect
MonoProcessor<Void> connectMono = MonoProcessor.create();

this.tcpClient
.newHandler(new ReactorNettyHandler(handler))
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(NettyContext::onClose) // post-connect issues
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
Expand Down Expand Up @@ -302,14 +264,16 @@ private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbo
@Override
@SuppressWarnings("unchecked")
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
if (logger.isDebugEnabled()) {
logger.debug("Connected to " + inbound.remoteAddress());
}
inbound.withConnection(c -> {
if (logger.isDebugEnabled()) {
logger.debug("Connected to " + c.address());
}
});
DirectProcessor<Void> completion = DirectProcessor.create();
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
scheduler.schedule(() -> connectionHandler.afterConnected(connection));

inbound.context().addHandler(new StompMessageDecoder<>(codec));
inbound.withConnection(c -> c.addHandler(new StompMessageDecoder<>(codec)));

inbound.receiveObject()
.cast(Message.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
package org.springframework.messaging.tcp.reactor;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelPipeline;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.NettyPipeline;

import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
Expand Down Expand Up @@ -66,20 +64,13 @@ public ListenableFuture<Void> send(Message<P> message) {
@Override
@SuppressWarnings("deprecation")
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
// TODO: workaround for https://github.com/reactor/reactor-netty/issues/22
ChannelPipeline pipeline = this.inbound.context().channel().pipeline();
String name = NettyPipeline.OnChannelReadIdle;
if (pipeline.context(name) != null) {
pipeline.remove(name);
}

this.inbound.onReadIdle(inactivityDuration, runnable);
this.inbound.withConnection(c -> c.onReadIdle(inactivityDuration, runnable));
}

@Override
@SuppressWarnings("deprecation")
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
this.outbound.onWriteIdle(inactivityDuration, runnable);
this.inbound.withConnection(c -> c.onWriteIdle(inactivityDuration, runnable));
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion spring-test/spring-test.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ dependencies {
testCompile("org.apache.httpcomponents:httpclient:4.5.5") {
exclude group: "commons-logging", module: "commons-logging"
}
testCompile('io.projectreactor.ipc:reactor-netty')
testCompile('io.projectreactor.ipc:reactor-netty:0.8.0.BUILD-SNAPSHOT')
testCompile('de.bechte.junit:junit-hierarchicalcontextrunner:4.12.1')
// Pull in the latest JUnit 5 Launcher API and the Vintage engine as well
// so that we can run JUnit 4 tests in IntelliJ IDEA.
Expand Down
2 changes: 1 addition & 1 deletion spring-web/spring-web.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies {
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
optional("io.reactivex.rxjava2:rxjava:${rxjava2Version}")
optional("io.netty:netty-all")
optional("io.projectreactor.ipc:reactor-netty")
optional("io.projectreactor.ipc:reactor-netty:0.8.0.BUILD-SNAPSHOT")
optional("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
optional("org.eclipse.jetty:jetty-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet-api"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
package org.springframework.http.client.reactive;

import java.net.URI;
import java.util.function.Consumer;
import java.util.function.Function;

import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.http.client.HttpClientOptions;
import reactor.ipc.netty.http.client.HttpClientRequest;
import reactor.ipc.netty.http.client.HttpClientResponse;
import reactor.ipc.netty.options.ClientOptions;

import org.springframework.http.HttpMethod;

import io.netty.buffer.ByteBufAllocator;

/**
* Reactor-Netty implementation of {@link ClientHttpConnector}.
*
Expand All @@ -43,20 +44,19 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {

/**
* Create a Reactor Netty {@link ClientHttpConnector}
* with default {@link ClientOptions} and HTTP compression support enabled.
* with a default configuration and HTTP compression support enabled.
*/
public ReactorClientHttpConnector() {
this.httpClient = HttpClient.builder()
.options(options -> options.compression(true))
.build();
this.httpClient = HttpClient.prepare()
.compress();
}

/**
* Create a Reactor Netty {@link ClientHttpConnector} with the given
* {@link HttpClientOptions.Builder}
* {@link HttpClient}
*/
public ReactorClientHttpConnector(Consumer<? super HttpClientOptions.Builder> clientOptions) {
this.httpClient = HttpClient.create(clientOptions);
public ReactorClientHttpConnector(HttpClient httpClient) {
this.httpClient = httpClient;
}


Expand All @@ -69,22 +69,24 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
}

return this.httpClient
.request(adaptHttpMethod(method),
uri.toString(),
request -> requestCallback.apply(adaptRequest(method, uri, request)))
.map(this::adaptResponse);
.request(adaptHttpMethod(method))
.uri(uri.toString())
.send((req, out) -> requestCallback.apply(adaptRequest(method, uri, req, out)))
.responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc())))
.next();
}

private io.netty.handler.codec.http.HttpMethod adaptHttpMethod(HttpMethod method) {
return io.netty.handler.codec.http.HttpMethod.valueOf(method.name());
}

private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request) {
return new ReactorClientHttpRequest(method, uri, request);
private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound out) {
return new ReactorClientHttpRequest(method, uri, request, out);
}

private ClientHttpResponse adaptResponse(HttpClientResponse response) {
return new ReactorClientHttpResponse(response);
private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound,
ByteBufAllocator alloc) {
return new ReactorClientHttpResponse(response, nettyInbound, alloc);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.http.client.HttpClientRequest;

import org.springframework.core.io.buffer.DataBuffer;
Expand All @@ -48,15 +49,18 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero

private final HttpClientRequest httpRequest;

private final NettyOutbound out;

private final NettyDataBufferFactory bufferFactory;


public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri,
HttpClientRequest httpRequest) {
HttpClientRequest httpRequest, NettyOutbound out) {
this.httpMethod = httpMethod;
this.uri = uri;
this.httpRequest = httpRequest.failOnClientError(false).failOnServerError(false);
this.bufferFactory = new NettyDataBufferFactory(httpRequest.alloc());
this.httpRequest = httpRequest;
this.out = out;
this.bufferFactory = new NettyDataBufferFactory(out.alloc());
}


Expand All @@ -77,14 +81,14 @@ public URI getURI() {

@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> this.httpRequest
return doCommit(() -> this.out
.send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then());
}

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).map(ReactorClientHttpRequest::toByteBufs);
return doCommit(() -> this.httpRequest.sendGroups(byteBufs).then());
return doCommit(() -> this.out.sendGroups(byteBufs).then());
}

private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
Expand All @@ -93,12 +97,12 @@ private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dat

@Override
public Mono<Void> writeWith(File file, long position, long count) {
return doCommit(() -> this.httpRequest.sendFile(file.toPath(), position, count).then());
return doCommit(() -> this.out.sendFile(file.toPath(), position, count).then());
}

@Override
public Mono<Void> setComplete() {
return doCommit(() -> httpRequest.sendHeaders().then());
return doCommit(() -> out.then());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collection;

import reactor.core.publisher.Flux;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.http.client.HttpClientResponse;

import org.springframework.core.io.buffer.DataBuffer;
Expand All @@ -30,6 +31,8 @@
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

import io.netty.buffer.ByteBufAllocator;

/**
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
*
Expand All @@ -43,16 +46,21 @@ class ReactorClientHttpResponse implements ClientHttpResponse {

private final HttpClientResponse response;

private final NettyInbound nettyInbound;


public ReactorClientHttpResponse(HttpClientResponse response) {
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound nettyInbound,
ByteBufAllocator alloc) {
this.response = response;
this.dataBufferFactory = new NettyDataBufferFactory(response.channel().alloc());
this.nettyInbound = nettyInbound;
this.dataBufferFactory = new NettyDataBufferFactory(alloc);
}


@Override
public Flux<DataBuffer> getBody() {
return response.receive()
return nettyInbound
.receive()
.map(buf -> {
buf.retain();
return dataBufferFactory.wrap(buf);
Expand Down
Loading