Skip to content

Commit

Permalink
Merge #2979 into 2.0.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Nov 21, 2023
2 parents 361cbdc + 707047e commit c03afa9
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.netty5.channel.ChannelHandler;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.ChannelOption;
import io.netty5.handler.codec.compression.ZlibCodecFactory;
import io.netty5.handler.codec.http.DefaultFullHttpRequest;
import io.netty5.handler.codec.http.DefaultHttpContent;
import io.netty5.handler.codec.http.DefaultHttpRequest;
Expand All @@ -59,7 +60,9 @@
import io.netty.contrib.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.contrib.handler.codec.http.multipart.HttpDataFactory;
import io.netty.contrib.handler.codec.http.multipart.HttpPostRequestEncoder;
import io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty5.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.netty5.handler.codec.http.websocketx.extensions.compression.DeflateFrameClientExtensionHandshaker;
import io.netty5.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateClientExtensionHandshaker;
import io.netty5.handler.codec.http2.Http2StreamChannel;
import io.netty5.handler.stream.ChunkedWriteHandler;
import io.netty5.handler.timeout.ReadTimeoutHandler;
Expand Down Expand Up @@ -91,6 +94,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.ContextView;

import static io.netty5.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker.MAX_WINDOW_SIZE;
import static reactor.netty5.ReactorNetty.format;

/**
Expand Down Expand Up @@ -839,7 +843,15 @@ final void withWebsocketSupport(WebsocketClientSpec websocketClientSpec) {
if (websocketClientSpec.compress()) {
requestHeaders().remove(HttpHeaderNames.ACCEPT_ENCODING);
removeHandler(NettyPipeline.HttpDecompressor);
addHandlerFirst(NettyPipeline.WsCompressionHandler, WebSocketClientCompressionHandler.INSTANCE);
PerMessageDeflateClientExtensionHandshaker perMessageDeflateClientExtensionHandshaker =
new PerMessageDeflateClientExtensionHandshaker(6, ZlibCodecFactory.isSupportingWindowSizeAndMemLevel(),
MAX_WINDOW_SIZE, websocketClientSpec.compressionAllowClientNoContext(),
websocketClientSpec.compressionRequestedServerNoContext());
addHandlerFirst(NettyPipeline.WsCompressionHandler,
new WebSocketClientExtensionHandler(
perMessageDeflateClientExtensionHandshaker,
new DeflateFrameClientExtensionHandshaker(false),
new DeflateFrameClientExtensionHandshaker(true)));
}

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package reactor.netty5.http.client;

import io.netty5.handler.codec.http.websocketx.WebSocketVersion;
import io.netty5.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateClientExtensionHandshaker;
import reactor.netty5.http.websocket.WebsocketSpec;

import java.util.Objects;
Expand All @@ -28,6 +29,22 @@
*/
public interface WebsocketClientSpec extends WebsocketSpec {

/**
* Returns whether the server is allowed to activate {@code client_no_context_takeover}.
*
* @return whether the server is allowed to activate {@code client_no_context_takeover}
* @since 1.1.14
*/
boolean compressionAllowClientNoContext();

/**
* Returns whether the client needs to activate {@code server_no_context_takeover}.
*
* @return whether the client needs to activate {@code server_no_context_takeover}
* @since 1.1.14
*/
boolean compressionRequestedServerNoContext();

/**
* Returns the configured WebSocket version.
*
Expand All @@ -47,6 +64,10 @@ public interface WebsocketClientSpec extends WebsocketSpec {
* handlePing = false
* <br>
* compress = false
* <br>
* compressionAllowClientNoContext = false
* <br>
* compressionRequestedServerNoContext = false
*
* @return {@link Builder}
*/
Expand All @@ -56,11 +77,42 @@ static Builder builder() {

final class Builder extends WebsocketSpec.Builder<Builder> {

boolean allowClientNoContext;
boolean requestedServerNoContext;
WebSocketVersion version = WebSocketVersion.V13;

private Builder() {
}

/**
* Allows the server to activate {@code client_no_context_takeover}
* Default to false.
*
* @param allowClientNoContext allows the server to activate {@code client_no_context_takeover}
* @return {@literal this}
* @since 1.1.14
* @see PerMessageDeflateClientExtensionHandshaker
*/
public final Builder compressionAllowClientNoContext(boolean allowClientNoContext) {
this.allowClientNoContext = allowClientNoContext;
return this;
}

/**
* Indicates if the client needs to activate {@code server_no_context_takeover} if the server is compatible with.
* Default to false.
*
* @param requestedServerNoContext indicates if the client needs to activate
* {@code server_no_context_takeover} if the server is compatible with
* @return {@literal this}
* @since 1.1.14
* @see PerMessageDeflateClientExtensionHandshaker
*/
public final Builder compressionRequestedServerNoContext(boolean requestedServerNoContext) {
this.requestedServerNoContext = requestedServerNoContext;
return this;
}

/**
* Sets websocket version to use.
* Set to {@link io.netty5.handler.codec.http.websocketx.WebSocketVersion#V13} by default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,29 @@
*/
final class WebsocketClientSpecImpl extends WebsocketSpecImpl implements WebsocketClientSpec {

private final WebSocketVersion version;
@Override
public boolean compressionAllowClientNoContext() {
return allowClientNoContext;
}

@Override
public boolean compressionRequestedServerNoContext() {
return requestedServerNoContext;
}

@Override
public WebSocketVersion version() {
return version;
}

private final boolean allowClientNoContext;
private final boolean requestedServerNoContext;
private final WebSocketVersion version;

WebsocketClientSpecImpl(WebsocketClientSpec.Builder builder) {
super(builder);
this.allowClientNoContext = builder.allowClientNoContext;
this.requestedServerNoContext = builder.requestedServerNoContext;
this.version = builder.version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelFutureListeners;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.compression.ZlibCodecFactory;
import io.netty5.handler.codec.http.DefaultFullHttpRequest;
import io.netty5.handler.codec.http.EmptyLastHttpContent;
import io.netty5.handler.codec.http.HttpHeaderNames;
Expand All @@ -32,7 +33,9 @@
import io.netty5.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty5.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty5.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty5.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler;
import io.netty5.handler.codec.http.websocketx.extensions.compression.DeflateFrameServerExtensionHandshaker;
import io.netty5.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker;
import io.netty5.util.concurrent.Future;
import io.netty5.util.concurrent.FutureListener;
import org.reactivestreams.Publisher;
Expand All @@ -48,6 +51,7 @@
import reactor.netty5.http.websocket.WebsocketOutbound;
import reactor.util.annotation.Nullable;

import static io.netty5.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker.MAX_WINDOW_SIZE;
import static reactor.netty5.ReactorNetty.format;

/**
Expand Down Expand Up @@ -97,14 +101,20 @@ final class WebsocketServerOperations extends HttpServerOperations
if (websocketServerSpec.compress()) {
removeHandler(NettyPipeline.CompressionHandler);

WebSocketServerCompressionHandler wsServerCompressionHandler =
new WebSocketServerCompressionHandler();
PerMessageDeflateServerExtensionHandshaker perMessageDeflateServerExtensionHandshaker =
new PerMessageDeflateServerExtensionHandshaker(6, ZlibCodecFactory.isSupportingWindowSizeAndMemLevel(),
MAX_WINDOW_SIZE, websocketServerSpec.compressionAllowServerNoContext(),
websocketServerSpec.compressionPreferredClientNoContext());
WebSocketServerExtensionHandler wsServerExtensionHandler =
new WebSocketServerExtensionHandler(
perMessageDeflateServerExtensionHandshaker,
new DeflateFrameServerExtensionHandshaker());
try {
wsServerCompressionHandler.channelRead(channel.pipeline()
.context(NettyPipeline.ReactiveBridge),
wsServerExtensionHandler.channelRead(channel.pipeline()
.context(NettyPipeline.ReactiveBridge),
request);

addHandlerFirst(NettyPipeline.WsCompressionHandler, wsServerCompressionHandler);
addHandlerFirst(NettyPipeline.WsCompressionHandler, wsServerExtensionHandler);
}
catch (Throwable e) {
log.error(format(channel(), ""), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package reactor.netty5.http.server;

import io.netty5.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker;
import reactor.netty5.http.websocket.WebsocketSpec;

/**
Expand All @@ -25,6 +26,22 @@
*/
public interface WebsocketServerSpec extends WebsocketSpec {

/**
* Returns whether the client is allowed to activate {@code server_no_context_takeover}.
*
* @return whether the client is allowed to activate {@code server_no_context_takeover}
* @since 1.1.14
*/
boolean compressionAllowServerNoContext();

/**
* Returns whether the server prefers to activate {@code client_no_context_takeover}.
*
* @return whether the server prefers to activate {@code client_no_context_takeover}
* @since 1.1.14
*/
boolean compressionPreferredClientNoContext();

/**
* Create builder with default properties.<br>
* protocols = null
Expand All @@ -34,6 +51,10 @@ public interface WebsocketServerSpec extends WebsocketSpec {
* handlePing = false
* <br>
* compress = false
* <br>
* compressionAllowServerNoContext = false
* <br>
* compressionPreferredClientNoContext = false
*
* @return {@link WebsocketServerSpec.Builder}
*/
Expand All @@ -43,9 +64,41 @@ static Builder builder() {

final class Builder extends WebsocketSpec.Builder<Builder> {

boolean allowServerNoContext;
boolean preferredClientNoContext;

private Builder() {
}

/**
* Allows the client to activate {@code server_no_context_takeover}.
* Default to false.
*
* @param allowServerNoContext allows the client to activate {@code server_no_context_takeover}
* @return {@literal this}
* @since 1.1.14
* @see PerMessageDeflateServerExtensionHandshaker
*/
public final Builder compressionAllowServerNoContext(boolean allowServerNoContext) {
this.allowServerNoContext = allowServerNoContext;
return this;
}

/**
* Indicates if the server prefers to activate {@code client_no_context_takeover} if client is compatible with.
* Default to false.
*
* @param preferredClientNoContext indicates if the server prefers to activate
* {@code client_no_context_takeover} if client is compatible with
* @return {@literal this}
* @since 1.1.14
* @see PerMessageDeflateServerExtensionHandshaker
*/
public final Builder compressionPreferredClientNoContext(boolean preferredClientNoContext) {
this.preferredClientNoContext = preferredClientNoContext;
return this;
}

/**
* Builds new {@link WebsocketServerSpec}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,22 @@
*/
final class WebsocketServerSpecImpl extends WebsocketSpecImpl implements WebsocketServerSpec {

@Override
public boolean compressionAllowServerNoContext() {
return allowServerNoContext;
}

@Override
public boolean compressionPreferredClientNoContext() {
return preferredClientNoContext;
}

private final boolean allowServerNoContext;
private final boolean preferredClientNoContext;

WebsocketServerSpecImpl(WebsocketServerSpec.Builder builder) {
super(builder);
this.allowServerNoContext = builder.allowServerNoContext;
this.preferredClientNoContext = builder.preferredClientNoContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1218,21 +1218,32 @@ void testIssue967() throws Exception {

@Test
void testIssue970_WithCompress() {
doTestIssue970(true);
doTestWebsocketCompression(true);
}

@Test
void testIssue970_NoCompress() {
doTestIssue970(false);
doTestWebsocketCompression(false);
}

private void doTestIssue970(boolean compress) {
@Test
void testIssue2973() {
doTestWebsocketCompression(true, true);
}

private void doTestWebsocketCompression(boolean compress) {
doTestWebsocketCompression(compress, false);
}

private void doTestWebsocketCompression(boolean compress, boolean clientServerNoContextTakeover) {
WebsocketServerSpec.Builder serverBuilder = WebsocketServerSpec.builder().compress(compress);
WebsocketServerSpec websocketServerSpec = clientServerNoContextTakeover ?
serverBuilder.compressionAllowServerNoContext(true).compressionPreferredClientNoContext(true).build() :
serverBuilder.build();
disposableServer =
createServer()
.handle((req, res) ->
res.sendWebsocket(
(in, out) -> out.sendString(Mono.just("test")),
WebsocketServerSpec.builder().compress(compress).build()))
res.sendWebsocket((in, out) -> out.sendString(Mono.just("test")), websocketServerSpec))
.bindNow();

AtomicBoolean clientHandler = new AtomicBoolean();
Expand Down Expand Up @@ -1266,8 +1277,15 @@ private void doTestIssue970(boolean compress) {

if (compress) {
predicate = t -> "test".equals(t.getT1()) && !"null".equals(t.getT2());
predicate = clientServerNoContextTakeover ?
predicate.and(t -> t.getT2().contains("client_no_context_takeover") && t.getT2().contains("server_no_context_takeover")) :
predicate.and(t -> !t.getT2().contains("client_no_context_takeover") && !t.getT2().contains("server_no_context_takeover"));
}
StepVerifier.create(client.websocket(WebsocketClientSpec.builder().compress(compress).build())
WebsocketClientSpec.Builder clientBuilder = WebsocketClientSpec.builder().compress(compress);
WebsocketClientSpec websocketClientSpec = clientServerNoContextTakeover ?
clientBuilder.compressionAllowClientNoContext(true).compressionRequestedServerNoContext(true).build() :
clientBuilder.build();
StepVerifier.create(client.websocket(websocketClientSpec)
.uri("/")
.handle(receiver))
.expectNextMatches(predicate)
Expand Down

0 comments on commit c03afa9

Please sign in to comment.