Skip to content

Resolve drift in Netty http/2 apis. #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/netty
Submodule netty updated 260 files
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package com.google.net.stubby.transport.netty;

import com.google.common.base.Preconditions;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;

Expand All @@ -48,10 +49,8 @@
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFlowController;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamException;

import java.util.ArrayDeque;
import java.util.Deque;
Expand Down Expand Up @@ -81,15 +80,17 @@ public PendingStream(CreateStreamCommand command, ChannelPromise promise) {
}

private final Deque<PendingStream> pendingStreams = new ArrayDeque<PendingStream>();
private final Http2LocalFlowController inboundFlow;
private Throwable connectionError;
private ChannelHandlerContext ctx;

public NettyClientHandler(Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
Http2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow) {
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, new LazyFrameListener());
Http2LocalFlowController inboundFlow) {
super(connection, frameReader, frameWriter, new LazyFrameListener());
this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow");

initListener();

// Disallow stream creation by the server.
Expand Down Expand Up @@ -148,7 +149,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
void returnProcessedBytes(int streamId, int bytes) {
try {
Http2Stream http2Stream = connection().requireStream(streamId);
http2Stream.garbageCollector().returnProcessedBytes(ctx, bytes);
inboundFlow.consumeBytes(ctx, http2Stream, bytes);
} catch (Http2Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -215,7 +216,7 @@ protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause,

@Override
protected void onStreamError(ChannelHandlerContext ctx, Throwable cause,
Http2StreamException http2Ex) {
Http2Exception.StreamException http2Ex) {
// Close the stream with a status that contains the cause.
Http2Stream stream = connection().stream(http2Ex.streamId());
if (stream != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,14 @@
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.ssl.SslContext;
import io.netty.util.internal.logging.InternalLogLevel;
Expand Down Expand Up @@ -250,10 +248,8 @@ private static NettyClientHandler newHandler() {
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);

DefaultHttp2InboundFlowController inboundFlow =
new DefaultHttp2InboundFlowController(connection, frameWriter);
Http2OutboundFlowController outboundFlow =
new DefaultHttp2OutboundFlowController(connection, frameWriter);
return new NettyClientHandler(connection, frameReader, frameWriter, inboundFlow, outboundFlow);
DefaultHttp2LocalFlowController inboundFlow =
new DefaultHttp2LocalFlowController(connection, frameWriter);
return new NettyClientHandler(connection, frameReader, frameWriter, inboundFlow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,13 @@
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFlowController;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamException;
import io.netty.util.ReferenceCountUtil;

import java.util.logging.Level;
Expand All @@ -82,6 +81,7 @@ class NettyServerHandler extends Http2ConnectionHandler {
private static final Status GOAWAY_STATUS = Status.UNAVAILABLE;

private final ServerTransportListener transportListener;
private final Http2LocalFlowController inboundFlow;
private Throwable connectionError;
private ChannelHandlerContext ctx;
private boolean teWarningLogged;
Expand All @@ -90,10 +90,10 @@ class NettyServerHandler extends Http2ConnectionHandler {
Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
Http2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow) {
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, new LazyFrameListener());
Http2LocalFlowController inboundFlow) {
super(connection, frameReader, frameWriter, new LazyFrameListener());
this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener");
this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow");
initListener();
connection.local().allowPushTo(false);
}
Expand Down Expand Up @@ -189,9 +189,9 @@ protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause,

@Override
protected void onStreamError(ChannelHandlerContext ctx, Throwable cause,
Http2StreamException http2Ex) {
StreamException http2Ex) {
logger.log(Level.WARNING, "Stream Error", cause);
Http2Stream stream = connection().stream(http2Ex.streamId());
Http2Stream stream = connection().stream(http2Ex.streamId(http2Ex));
if (stream != null) {
// Abort the stream with a status to help the client with debugging.
// Don't need to send a RST_STREAM since the end-of-stream flag will
Expand Down Expand Up @@ -240,7 +240,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
void returnProcessedBytes(int streamId, int bytes) {
try {
Http2Stream http2Stream = connection().requireStream(streamId);
http2Stream.garbageCollector().returnProcessedBytes(ctx, bytes);
inboundFlow.consumeBytes(ctx, http2Stream, bytes);
} catch (Http2Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -305,25 +305,25 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

private String determineMethod(int streamId, Http2Headers headers) throws Http2StreamException {
private String determineMethod(int streamId, Http2Headers headers) throws Http2Exception {
if (!HTTP_METHOD.equals(headers.method())) {
throw new Http2StreamException(streamId, Http2Error.REFUSED_STREAM,
String.format("Method '%s' is not supported", headers.method()));
throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM,
"Method '%s' is not supported", headers.method());
}
checkHeader(streamId, headers, CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC);
String methodName = TransportFrameUtil.getFullMethodNameFromPath(headers.path().toString());
if (methodName == null) {
throw new Http2StreamException(streamId, Http2Error.REFUSED_STREAM,
String.format("Malformatted path: %s", headers.path()));
throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM,
"Malformatted path: %s", headers.path());
}
return methodName;
}

private static void checkHeader(int streamId, Http2Headers headers,
AsciiString header, AsciiString expectedValue) throws Http2StreamException {
AsciiString header, AsciiString expectedValue) throws Http2Exception {
if (!expectedValue.equals(headers.get(header))) {
throw new Http2StreamException(streamId, Http2Error.REFUSED_STREAM, String.format(
"Header '%s'='%s', while '%s' is expected", header, headers.get(header), expectedValue));
throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM,
"Header '%s'='%s', while '%s' is expected", header, headers.get(header), expectedValue);
}
}

Expand All @@ -334,8 +334,8 @@ private NettyServerStream serverStream(Http2Stream stream) {
return stream.getProperty(NettyServerStream.class);
}

private Http2StreamException newStreamException(int streamId, Throwable cause) {
return new Http2StreamException(streamId, Http2Error.INTERNAL_ERROR, cause.getMessage(), cause);
private Http2Exception newStreamException(int streamId, Throwable cause) {
return Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR, cause.getMessage(), cause);
}

private static class LazyFrameListener extends Http2FrameAdapter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.ssl.SslContext;
import io.netty.util.internal.logging.InternalLogLevel;
Expand Down Expand Up @@ -129,15 +127,12 @@ private NettyServerHandler createHandler(ServerTransportListener transportListen
Http2FrameWriter frameWriter =
new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);

DefaultHttp2InboundFlowController inboundFlow =
new DefaultHttp2InboundFlowController(connection, frameWriter);
Http2OutboundFlowController outboundFlow =
new DefaultHttp2OutboundFlowController(connection, frameWriter);
DefaultHttp2LocalFlowController inboundFlow =
new DefaultHttp2LocalFlowController(connection, frameWriter);
return new NettyServerHandler(transportListener,
connection,
frameReader,
frameWriter,
inboundFlow,
outboundFlow);
inboundFlow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,13 @@
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Settings;

import org.junit.Before;
Expand Down Expand Up @@ -314,15 +312,12 @@ private static NettyClientHandler newHandler() {
Http2Connection connection = new DefaultHttp2Connection(false);
Http2FrameReader frameReader = new DefaultHttp2FrameReader();
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
DefaultHttp2InboundFlowController inboundFlow =
new DefaultHttp2InboundFlowController(connection, frameWriter);
Http2OutboundFlowController outboundFlow =
new DefaultHttp2OutboundFlowController(connection, frameWriter);
DefaultHttp2LocalFlowController inboundFlow =
new DefaultHttp2LocalFlowController(connection, frameWriter);
return new NettyClientHandler(connection,
frameReader,
frameWriter,
inboundFlow,
outboundFlow);
inboundFlow);
}

private AsciiString as(String string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static com.google.net.stubby.transport.netty.Utils.TE_HEADER;
import static com.google.net.stubby.transport.netty.Utils.TE_TRAILERS;
import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -68,15 +68,14 @@
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Settings;

import org.junit.Before;
Expand Down Expand Up @@ -233,9 +232,10 @@ public void connectionErrorShouldCloseChannel() throws Exception {
handler.channelRead(ctx, badFrame());

// Verify the expected GO_AWAY frame was written.
Exception e = protocolError("Frame length 0 incorrect size for ping.");
Exception e = connectionError(Http2Error.PROTOCOL_ERROR,
"Frame length 0 incorrect size for ping.");
ByteBuf expected =
goAwayFrame(STREAM_ID, (int) Http2Error.PROTOCOL_ERROR.code(), toByteBuf(ctx, e));
goAwayFrame(STREAM_ID, (int) Http2Error.FRAME_SIZE_ERROR.code(), toByteBuf(ctx, e));
ByteBuf actual = captureWrite(ctx);
assertEquals(expected, actual);

Expand Down Expand Up @@ -264,6 +264,7 @@ private void createStream() throws Exception {
.path(new AsciiString("/foo.bar"));
ByteBuf headersFrame = headersFrame(STREAM_ID, headers);
handler.channelRead(ctx, headersFrame);

ArgumentCaptor<NettyServerStream> streamCaptor =
ArgumentCaptor.forClass(NettyServerStream.class);
@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -309,15 +310,12 @@ private static NettyServerHandler newHandler(ServerTransportListener transportLi
Http2Connection connection = new DefaultHttp2Connection(true);
Http2FrameReader frameReader = new DefaultHttp2FrameReader();
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
DefaultHttp2InboundFlowController inboundFlow =
new DefaultHttp2InboundFlowController(connection, frameWriter);
Http2OutboundFlowController outboundFlow =
new DefaultHttp2OutboundFlowController(connection, frameWriter);
DefaultHttp2LocalFlowController inboundFlow =
new DefaultHttp2LocalFlowController(connection, frameWriter);
return new NettyServerHandler(transportListener,
connection,
frameReader,
frameWriter,
inboundFlow,
outboundFlow);
inboundFlow);
}
}