-
Notifications
You must be signed in to change notification settings - Fork 26.6k
Support Tri backpress #15957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Tri backpress #15957
Changes from all commits
a9d5295
37d6693
f2a427d
75f85f8
72a6bed
1edffe2
00be4d8
b4f943f
5e2e7c9
72462bb
3c8a958
0006218
b74c57a
16893d1
576beef
2796b5e
f959c8a
38dc682
36c16f4
38a01bf
3f18893
780cce7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -184,7 +184,20 @@ private void skipOffset(InputStream inputStream, int lengthFieldOffset) throws I | |||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| private void processBody() throws IOException { | ||||||||||||||||||||||||||||||
| byte[] rawMessage = readRawMessage(accumulate, requiredLength); | ||||||||||||||||||||||||||||||
| // Calculate total bytes read: header (offset + length field) + payload | ||||||||||||||||||||||||||||||
| int totalBytesRead = lengthFieldOffset + lengthFieldLength + requiredLength; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| byte[] rawMessage; | ||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||
| rawMessage = readRawMessage(accumulate, requiredLength); | ||||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||||
| // Notify listener about bytes read for flow control immediately after reading bytes | ||||||||||||||||||||||||||||||
| // This must be in finally block to ensure flow control works even if reading fails | ||||||||||||||||||||||||||||||
| // Following gRPC's pattern: bytesRead is called as soon as bytes are consumed from input | ||||||||||||||||||||||||||||||
| listener.bytesRead(totalBytesRead); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
Comment on lines
+190
to
+199
|
||||||||||||||||||||||||||||||
| byte[] rawMessage; | |
| try { | |
| rawMessage = readRawMessage(accumulate, requiredLength); | |
| } finally { | |
| // Notify listener about bytes read for flow control immediately after reading bytes | |
| // This must be in finally block to ensure flow control works even if reading fails | |
| // Following gRPC's pattern: bytesRead is called as soon as bytes are consumed from input | |
| listener.bytesRead(totalBytesRead); | |
| } | |
| byte[] rawMessage = readRawMessage(accumulate, requiredLength); | |
| // Notify listener about bytes read for flow control immediately after successfully reading bytes | |
| // Following gRPC's pattern: bytesRead is called as soon as bytes are consumed from input | |
| listener.bytesRead(totalBytesRead); |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,6 +16,8 @@ | |||||||||||||||||||||
| */ | ||||||||||||||||||||||
| package org.apache.dubbo.remoting.http12.netty4.h2; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; | ||||||||||||||||||||||
| import org.apache.dubbo.common.logger.LoggerFactory; | ||||||||||||||||||||||
| import org.apache.dubbo.config.nested.TripleConfig; | ||||||||||||||||||||||
| import org.apache.dubbo.remoting.http12.HttpMetadata; | ||||||||||||||||||||||
| import org.apache.dubbo.remoting.http12.HttpOutputMessage; | ||||||||||||||||||||||
|
|
@@ -31,17 +33,29 @@ | |||||||||||||||||||||
| import io.netty.buffer.ByteBuf; | ||||||||||||||||||||||
| import io.netty.buffer.ByteBufOutputStream; | ||||||||||||||||||||||
| import io.netty.handler.codec.http2.DefaultHttp2ResetFrame; | ||||||||||||||||||||||
| import io.netty.handler.codec.http2.Http2Connection; | ||||||||||||||||||||||
| import io.netty.handler.codec.http2.Http2LocalFlowController; | ||||||||||||||||||||||
| import io.netty.handler.codec.http2.Http2Stream; | ||||||||||||||||||||||
| import io.netty.handler.codec.http2.Http2StreamChannel; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_RESPONSE; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public class NettyH2StreamChannel implements H2StreamChannel { | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private static final ErrorTypeAwareLogger LOGGER = | ||||||||||||||||||||||
| LoggerFactory.getErrorTypeAwareLogger(NettyH2StreamChannel.class); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private final Http2StreamChannel http2StreamChannel; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private final TripleConfig tripleConfig; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public NettyH2StreamChannel(Http2StreamChannel http2StreamChannel, TripleConfig tripleConfig) { | ||||||||||||||||||||||
| private final Http2Connection http2Connection; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public NettyH2StreamChannel( | ||||||||||||||||||||||
| Http2StreamChannel http2StreamChannel, TripleConfig tripleConfig, Http2Connection http2Connection) { | ||||||||||||||||||||||
| this.http2StreamChannel = http2StreamChannel; | ||||||||||||||||||||||
| this.tripleConfig = tripleConfig; | ||||||||||||||||||||||
| this.http2Connection = http2Connection; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| @Override | ||||||||||||||||||||||
|
|
@@ -89,4 +103,45 @@ public CompletableFuture<Void> writeResetFrame(long errorCode) { | |||||||||||||||||||||
| http2StreamChannel.write(resetFrame).addListener(nettyHttpChannelFutureListener); | ||||||||||||||||||||||
| return nettyHttpChannelFutureListener; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| @Override | ||||||||||||||||||||||
| public void consumeBytes(int numBytes) throws Exception { | ||||||||||||||||||||||
| if (numBytes <= 0) { | ||||||||||||||||||||||
| return; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if (http2Connection == null) { | ||||||||||||||||||||||
| LOGGER.debug("Http2Connection not available, skip consumeBytes"); | ||||||||||||||||||||||
| return; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Http2LocalFlowController localFlowController = http2Connection.local().flowController(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Get the stream from connection using stream id | ||||||||||||||||||||||
| int streamId = http2StreamChannel.stream().id(); | ||||||||||||||||||||||
|
Comment on lines
+120
to
+121
|
||||||||||||||||||||||
| // Get the stream from connection using stream id | |
| int streamId = http2StreamChannel.stream().id(); | |
| // Get the stream from Http2StreamChannel and then from connection using stream id | |
| Http2Stream channelStream = http2StreamChannel.stream(); | |
| if (channelStream == null) { | |
| LOGGER.debug("Http2StreamChannel.stream() is null, skip consumeBytes"); | |
| return; | |
| } | |
| int streamId = channelStream.id(); |
Copilot
AI
Dec 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent error handling: When in the event loop (line 131), exceptions from consumeBytes are propagated (declared in the method signature), but when executed asynchronously (line 135), exceptions are caught and logged. This inconsistency could lead to different behavior depending on which thread calls the method. Consider catching the exception in both cases or propagating it in both cases for consistency.
| http2StreamChannel.eventLoop().execute(() -> { | |
| try { | |
| localFlowController.consumeBytes(stream, numBytes); | |
| } catch (Exception e) { | |
| LOGGER.warn(PROTOCOL_FAILED_RESPONSE, "", "", "Failed to consumeBytes for stream " + streamId, e); | |
| } | |
| }); | |
| http2StreamChannel.eventLoop().execute(() -> localFlowController.consumeBytes(stream, numBytes)); |
Uh oh!
There was an error while loading. Please reload this page.