Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class TripleConfig implements Serializable {
public static final int DEFAULT_MAX_FRAME_SIZE = 8_388_608;
public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 32_768;
public static final int DEFAULT_MAX_MESSAGE_SIZE = 50 * 1024 * 1024;
public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;

public static final String H2_SETTINGS_MAX_MESSAGE_SIZE_KEY = "dubbo.protocol.triple.max-message-size";

Expand Down Expand Up @@ -151,6 +152,17 @@ public class TripleConfig implements Serializable {
*/
private Integer maxMessageSize;

/**
* Window update ratio for HTTP/2 flow control.
* Determines when to send WINDOW_UPDATE frames based on the ratio of consumed bytes
* to the initial window size. For example, 0.5 means WINDOW_UPDATE is sent when
* 50% of the initial window has been consumed.
* <p>Valid range: 0.0 to 1.0 (exclusive of 0, as 0 would disable window updates)
* <p>The default value is 0.5.
* <p>For HTTP/2
*/
private Float windowUpdateRatio;

@Nested
private RestConfig rest;

Expand Down Expand Up @@ -355,6 +367,22 @@ public void setMaxMessageSize(Integer maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}

public Float getWindowUpdateRatio() {
return windowUpdateRatio;
}

@Parameter(excluded = true)
public float getWindowUpdateRatioOrDefault() {
return windowUpdateRatio == null ? DEFAULT_WINDOW_UPDATE_RATIO : windowUpdateRatio;
}

public void setWindowUpdateRatio(Float windowUpdateRatio) {
if (windowUpdateRatio != null && (windowUpdateRatio <= 0.0f || windowUpdateRatio > 1.0f)) {
throw new IllegalArgumentException("windowUpdateRatio must be > 0 and <= 1, but was: " + windowUpdateRatio);
}
this.windowUpdateRatio = windowUpdateRatio;
}

public RestConfig getRest() {
return rest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ public void setCompression(String compression) {}
@Override
public void disableAutoFlowControl() {}

@Override
public boolean isReady() {
return true;
}

@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
/* no-op for test */
}

@Override
public void onNext(String v) {
publisher.onNext(v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ public Http2OutputMessage newOutputMessage(boolean endStream) {
return new Http2OutputMessageFrame(new ByteArrayOutputStream(256), endStream);
}

@Override
public void consumeBytes(int numBytes) throws Exception {
// No flow control for servlet
}

@Override
public CompletableFuture<Void> writeHeader(HttpMetadata httpMetadata) {
if (writeable.get()) {
Expand Down Expand Up @@ -257,6 +262,11 @@ public SocketAddress localAddress() {
@Override
public void flush() {}

@Override
public boolean isReady() {
return writeable.get();
}

private static CompletableFuture<Void> completed() {
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public Http2OutputMessage newOutputMessage(boolean endStream) {
new LimitedByteArrayOutputStream(256, tripleConfig.getMaxResponseBodySizeOrDefault()), endStream);
}

@Override
public void consumeBytes(int numBytes) throws Exception {
// do nothing
}

@Override
public CompletableFuture<Void> writeHeader(HttpMetadata httpMetadata) {
Http2Header http2Header = (Http2Header) httpMetadata;
Expand Down Expand Up @@ -124,6 +129,11 @@ public SocketAddress localAddress() {
@Override
public void flush() {}

@Override
public boolean isReady() {
return session.isOpen();
}

private CloseReason encodeCloseReason(Http2Header http2Header) {
HttpHeaders headers = http2Header.headers();
List<String> statusHeaders = headers.remove(HttpHeaderNames.STATUS.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,25 @@ default Http2OutputMessage newOutputMessage() {
}

Http2OutputMessage newOutputMessage(boolean endStream);

/**
* Consume bytes from the local flow controller to trigger WINDOW_UPDATE frames.
* This method should be called when data has been processed and more data can be received.
*
* @param numBytes the number of bytes to consume
* @throws Exception if an error occurs during consumption
*/
void consumeBytes(int numBytes) throws Exception;

/**
* Returns whether the stream is ready for writing. If false, the caller should avoid
* calling {@link #writeMessage(org.apache.dubbo.remoting.http12.HttpOutputMessage)}
* to avoid blocking or excessive buffering.
*
* <p>This method is used for outgoing flow control / backpressure. When the underlying
* transport buffer is full, this returns false.
*
* @return true if the stream is ready for writing
*/
boolean isReady();
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ public Http2OutputMessage newOutputMessage(boolean endStream) {
return h2StreamChannel.newOutputMessage(endStream);
}

@Override
public void consumeBytes(int numBytes) throws Exception {
h2StreamChannel.consumeBytes(numBytes);
}

@Override
public boolean isReady() {
return h2StreamChannel.isReady();
}

@Override
public String toString() {
return "Http2ChannelDelegate{" + "h2StreamChannel=" + h2StreamChannel + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,38 @@ public class Http2ServerChannelObserver extends AbstractServerHttpChannelObserve

private boolean autoRequestN = true;

private Runnable onReadyHandler;

public Http2ServerChannelObserver(H2StreamChannel h2StreamChannel) {
super(h2StreamChannel);
}

/**
* Returns whether the stream is ready for writing.
* If false, the caller should avoid calling onNext to prevent blocking or excessive buffering.
*/
public boolean isReady() {
return getHttpChannel().isReady();
}

/**
* Sets a callback to be invoked when the stream becomes ready for writing.
*/
public void setOnReadyHandler(Runnable onReadyHandler) {
this.onReadyHandler = onReadyHandler;
}

/**
* Called when the channel writability changes.
* Triggers the onReadyHandler if the channel is now writable.
*/
public void onWritabilityChanged() {
Runnable handler = this.onReadyHandler;
if (handler != null && isReady()) {
handler.run();
}
}

public void setStreamingDecoder(StreamingDecoder streamingDecoder) {
this.streamingDecoder = streamingDecoder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@
public interface Http2TransportListener extends CancelableTransportListener<Http2Header, Http2InputMessage> {

void close();

/**
* Called when the channel writability changes.
* This is used for backpressure support via isReady/setOnReadyHandler.
*/
default void onWritabilityChanged() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,20 @@ private void skipOffset(InputStream inputStream, int lengthFieldOffset) throws I
}

private void processBody() throws IOException {
byte[] rawMessage = readRawMessage(accumulate, requiredLength);
// Calculate total bytes read: header (offset + length field) + payload
int totalBytesRead = lengthFieldOffset + lengthFieldLength + requiredLength;

byte[] rawMessage;
try {
rawMessage = readRawMessage(accumulate, requiredLength);
} finally {
// Notify listener about bytes read for flow control immediately after reading bytes
// This must be in finally block to ensure flow control works even if reading fails
// Following gRPC's pattern: bytesRead is called as soon as bytes are consumed from input
listener.bytesRead(totalBytesRead);
}

Comment on lines +190 to +199
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bytesRead notification in the finally block will be called even if an exception occurs during readRawMessage. However, if readRawMessage fails partway through reading the buffer, the totalBytesRead value may not accurately reflect the actual bytes consumed. Consider whether this is the intended behavior or if bytesRead should only be called on successful completion.

Suggested change
byte[] rawMessage;
try {
rawMessage = readRawMessage(accumulate, requiredLength);
} finally {
// Notify listener about bytes read for flow control immediately after reading bytes
// This must be in finally block to ensure flow control works even if reading fails
// Following gRPC's pattern: bytesRead is called as soon as bytes are consumed from input
listener.bytesRead(totalBytesRead);
}
byte[] rawMessage = readRawMessage(accumulate, requiredLength);
// Notify listener about bytes read for flow control immediately after successfully reading bytes
// Following gRPC's pattern: bytesRead is called as soon as bytes are consumed from input
listener.bytesRead(totalBytesRead);

Copilot uses AI. Check for mistakes.
// Process the message after notifying about bytes read
InputStream inputStream = new ByteArrayInputStream(rawMessage);
invokeListener(inputStream);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ public interface StreamingDecoder {

interface FragmentListener {

/**
* Called when the given number of bytes has been read from the input source of the deframer.
* This is typically used to indicate to the underlying transport that more data can be
* accepted.
*/
void bytesRead(int numBytes);

/**
* @param rawMessage raw message
*/
Expand All @@ -42,31 +49,15 @@ interface FragmentListener {
default void onClose() {}
}

final class DefaultFragmentListener implements FragmentListener {

private final ListeningDecoder listeningDecoder;

public DefaultFragmentListener(ListeningDecoder listeningDecoder) {
this.listeningDecoder = listeningDecoder;
}

@Override
public void onFragmentMessage(InputStream rawMessage) {
listeningDecoder.decode(rawMessage);
}

@Override
public void onClose() {
listeningDecoder.close();
}
}

final class NoopFragmentListener implements FragmentListener {

static final FragmentListener NOOP = new NoopFragmentListener();

private NoopFragmentListener() {}

@Override
public void bytesRead(int numBytes) {}

@Override
public void onFragmentMessage(InputStream rawMessage) {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.dubbo.remoting.http12.netty4.h2;

import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.config.nested.TripleConfig;
import org.apache.dubbo.remoting.http12.HttpMetadata;
import org.apache.dubbo.remoting.http12.HttpOutputMessage;
Expand All @@ -31,17 +33,29 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamChannel;

import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_RESPONSE;

public class NettyH2StreamChannel implements H2StreamChannel {

private static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(NettyH2StreamChannel.class);

private final Http2StreamChannel http2StreamChannel;

private final TripleConfig tripleConfig;

public NettyH2StreamChannel(Http2StreamChannel http2StreamChannel, TripleConfig tripleConfig) {
private final Http2Connection http2Connection;

public NettyH2StreamChannel(
Http2StreamChannel http2StreamChannel, TripleConfig tripleConfig, Http2Connection http2Connection) {
this.http2StreamChannel = http2StreamChannel;
this.tripleConfig = tripleConfig;
this.http2Connection = http2Connection;
}

@Override
Expand Down Expand Up @@ -89,4 +103,45 @@ public CompletableFuture<Void> writeResetFrame(long errorCode) {
http2StreamChannel.write(resetFrame).addListener(nettyHttpChannelFutureListener);
return nettyHttpChannelFutureListener;
}

@Override
public void consumeBytes(int numBytes) throws Exception {
if (numBytes <= 0) {
return;
}

if (http2Connection == null) {
LOGGER.debug("Http2Connection not available, skip consumeBytes");
return;
}

Http2LocalFlowController localFlowController = http2Connection.local().flowController();

// Get the stream from connection using stream id
int streamId = http2StreamChannel.stream().id();
Comment on lines +120 to +121
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException: If http2StreamChannel.stream() returns null at line 121, the code will throw a NullPointerException. While there's a check for the stream being null after retrieving it from the connection (line 123), the stream().id() call at line 121 could fail first if stream() itself returns null. Consider checking if stream() is null before calling id().

Suggested change
// Get the stream from connection using stream id
int streamId = http2StreamChannel.stream().id();
// Get the stream from Http2StreamChannel and then from connection using stream id
Http2Stream channelStream = http2StreamChannel.stream();
if (channelStream == null) {
LOGGER.debug("Http2StreamChannel.stream() is null, skip consumeBytes");
return;
}
int streamId = channelStream.id();

Copilot uses AI. Check for mistakes.
Http2Stream stream = http2Connection.stream(streamId);
if (stream == null) {
LOGGER.debug("Stream {} not found in connection, skip consumeBytes", streamId);
return;
}

// Consume bytes to trigger WINDOW_UPDATE frame
// This must be executed in the event loop thread
if (http2StreamChannel.eventLoop().inEventLoop()) {
localFlowController.consumeBytes(stream, numBytes);
} else {
http2StreamChannel.eventLoop().execute(() -> {
try {
localFlowController.consumeBytes(stream, numBytes);
} catch (Exception e) {
LOGGER.warn(PROTOCOL_FAILED_RESPONSE, "", "", "Failed to consumeBytes for stream " + streamId, e);
}
});
Comment on lines +133 to +139
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent error handling: When in the event loop (line 131), exceptions from consumeBytes are propagated (declared in the method signature), but when executed asynchronously (line 135), exceptions are caught and logged. This inconsistency could lead to different behavior depending on which thread calls the method. Consider catching the exception in both cases or propagating it in both cases for consistency.

Suggested change
http2StreamChannel.eventLoop().execute(() -> {
try {
localFlowController.consumeBytes(stream, numBytes);
} catch (Exception e) {
LOGGER.warn(PROTOCOL_FAILED_RESPONSE, "", "", "Failed to consumeBytes for stream " + streamId, e);
}
});
http2StreamChannel.eventLoop().execute(() -> localFlowController.consumeBytes(stream, numBytes));

Copilot uses AI. Check for mistakes.
}
}

@Override
public boolean isReady() {
return http2StreamChannel.isWritable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}
h2StreamChannel.writeResetFrame(statusCode);
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// Notify the transport listener when writability changes
// This enables application-level backpressure via isReady/onReadyHandler
transportListener.onWritabilityChanged();
super.channelWritabilityChanged(ctx);
}
}
Loading
Loading