Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a9d5295
support disableAutoInboundFlowControl api
EarthChen Dec 29, 2025
37d6693
support back press
EarthChen Dec 30, 2025
f2a427d
fix
EarthChen Dec 30, 2025
75f85f8
fix
EarthChen Dec 30, 2025
72a6bed
Update dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/…
EarthChen Dec 30, 2025
1edffe2
Update dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/…
EarthChen Dec 30, 2025
00be4d8
Update dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/…
EarthChen Dec 30, 2025
b4f943f
Update dubbo-common/src/main/java/org/apache/dubbo/config/nested/Trip…
EarthChen Dec 30, 2025
5e2e7c9
Update dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/…
EarthChen Dec 30, 2025
72462bb
fix
EarthChen Dec 30, 2025
3c8a958
Merge branch 'tri-backpressure' of github.com:EarthChen/dubbo into tr…
EarthChen Dec 30, 2025
0006218
fix
EarthChen Dec 30, 2025
b74c57a
fix test
EarthChen Dec 31, 2025
16893d1
Merge branch '3.3' into tri-backpressure
RainYuY Dec 31, 2025
576beef
Merge branch '3.3' into tri-backpressure
wangchengming666 Jan 1, 2026
2796b5e
add isReady and setOnReadyHandler api
EarthChen Jan 4, 2026
f959c8a
fix
EarthChen Jan 4, 2026
38dc682
fix
EarthChen Jan 4, 2026
35334a9
Merge branch '3.3' into tri-backpressure
EarthChen Jan 5, 2026
f0a5c87
fix Backpressure it
EarthChen Jan 5, 2026
bfa04b2
fix
EarthChen Jan 5, 2026
e6683c8
fix
EarthChen Jan 5, 2026
9b50e05
ClientCallStreamObserver & ServerCallStreamObserver
EarthChen Jan 5, 2026
edf2132
add license
EarthChen Jan 5, 2026
b213009
mvn spotless:apply
EarthChen Jan 5, 2026
52f8316
fix dubbo plugin
EarthChen Jan 5, 2026
686e5be
fix dubbo plugin spotless
EarthChen Jan 5, 2026
b4bd5d6
fix
EarthChen Jan 5, 2026
d2ae97d
fix ut
EarthChen Jan 6, 2026
fe38316
Add gRPC-compatible APIs and fix the integration tests
EarthChen Jan 6, 2026
348858e
fix
EarthChen Jan 6, 2026
0ef8631
fix
EarthChen Jan 6, 2026
3c83ef4
Merge branch '3.3' into tri-backpressure
wangchengming666 Jan 6, 2026
2331ffc
remove default
EarthChen Jan 8, 2026
c7e27b8
remove default
EarthChen Jan 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,57 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.protocol.tri.observer;

import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
package org.apache.dubbo.common.stream;

/**
* An extension of {@link StreamObserver} that provides additional functionality for flow control
* and backpressure. This interface mirrors gRPC's {@code CallStreamObserver} for compatibility.
* and backpressure. This interface mirrors gRPC's {@code io.grpc.stub.CallStreamObserver} for compatibility.
*
* <p>This is the base interface for both client-side ({@link ClientCallStreamObserver}) and
* server-side ({@link ServerCallStreamObserver}) flow control observers. It provides two types
* of flow control:
*
* <h3>Send-Side Backpressure (Outbound Flow Control)</h3>
* <p>Controls the rate at which data is sent to avoid overwhelming the receiver:
* <ul>
* <li>{@link #isReady()} - Check if the stream can accept more data without blocking</li>
* <li>{@link #setOnReadyHandler(Runnable)} - Register a callback for when the stream becomes writable</li>
* </ul>
*
* <p>Key features:
* <h3>Receive-Side Backpressure (Inbound Flow Control)</h3>
* <p>Controls the rate at which data is received to avoid being overwhelmed:
* <ul>
* <li>{@link #isReady()} - Check if the stream is ready to accept more messages</li>
* <li>{@link #setOnReadyHandler(Runnable)} - Set a callback for when the stream becomes ready</li>
* <li>{@link #request(int)} - Request more messages from the peer (for inbound flow control)</li>
* <li>{@link #disableAutoFlowControl()} - Switch to manual flow control mode</li>
* <li>{@link #disableAutoFlowControl()} - Switch from automatic to manual message requesting</li>
* <li>{@link #request(int)} - Explicitly request a specific number of messages from the sender</li>
* </ul>
*
* <h3>Typical Usage Pattern</h3>
* <pre>{@code
* // Send-side backpressure example
* callStreamObserver.setOnReadyHandler(() -> {
* while (callStreamObserver.isReady() && hasMoreData()) {
* callStreamObserver.onNext(getNextData());
* }
* if (!hasMoreData()) {
* callStreamObserver.onCompleted();
* }
* });
*
* // Receive-side backpressure example (in beforeStart or similar)
* callStreamObserver.disableAutoFlowControl();
* callStreamObserver.request(10); // Request initial batch
*
* // Then in onNext()
* public void onNext(T value) {
* process(value);
* callStreamObserver.request(1); // Request next message after processing
* }
* }</pre>
*
* @param <T> the type of value passed to the stream
* @see ClientCallStreamObserver
* @see ServerCallStreamObserver
* @see ClientResponseObserver
*/
public interface CallStreamObserver<T> extends StreamObserver<T> {

Expand Down Expand Up @@ -84,8 +117,6 @@ public interface CallStreamObserver<T> extends StreamObserver<T> {
* <p>
* For stream set compression needs to determine whether the metadata has been sent, and carry
* on corresponding processing
*
* @param compression {@link Compressor}
*/
void setCompression(String compression);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.stream;

/**
* A client-side extension of {@link CallStreamObserver} that provides additional functionality
* for controlling the outbound request stream. This interface mirrors gRPC's
* {@code io.grpc.stub.ClientCallStreamObserver} for compatibility.
*
* <p>This interface is typically obtained through {@link ClientResponseObserver#beforeStart(ClientCallStreamObserver)}
* and provides methods for:
* <ul>
* <li>Controlling send-side backpressure via {@link #isReady()} and {@link #setOnReadyHandler(Runnable)}</li>
* <li>Controlling receive-side backpressure via {@link #disableAutoRequestWithInitial(int)} and {@link #request(int)}</li>
* </ul>
*
* <h3>Send-Side Backpressure (Controlling Outgoing Data Rate)</h3>
* <pre>{@code
* @Override
* public void beforeStart(ClientCallStreamObserver<Request> requestStream) {
* requestStream.disableAutoFlowControl();
* requestStream.setOnReadyHandler(() -> {
* while (requestStream.isReady() && hasMoreData()) {
* requestStream.onNext(getNextRequest());
* }
* });
* }
* }</pre>
*
* <h3>Receive-Side Backpressure (Controlling Incoming Data Rate)</h3>
* <pre>{@code
* @Override
* public void beforeStart(ClientCallStreamObserver<Request> requestStream) {
* // Request only 10 messages initially
* requestStream.disableAutoRequestWithInitial(10);
* }
*
* @Override
* public void onNext(Response response) {
* process(response);
* // Request more after processing
* requestStream.request(1);
* }
* }</pre>
*
* @param <ReqT> the type of messages sent to the server (request type)
* @see CallStreamObserver
* @see ClientResponseObserver
*/
public interface ClientCallStreamObserver<ReqT> extends CallStreamObserver<ReqT> {

/**
* Disables automatic inbound flow control and sets the initial number of messages
* to request from the server.
*
* <p>By default, the runtime automatically requests messages from the server as they
* are consumed. Calling this method switches to manual flow control mode, where the
* client must explicitly call {@link #request(int)} to receive more messages.
*
* <p>This method <strong>must</strong> be called within
* {@link ClientResponseObserver#beforeStart(ClientCallStreamObserver)} before the
* stream starts, otherwise it has no effect.
*
* <p><strong>Usage:</strong>
* <pre>{@code
* @Override
* public void beforeStart(ClientCallStreamObserver<Request> requestStream) {
* // Start with 5 messages, then request more in onNext()
* requestStream.disableAutoRequestWithInitial(5);
* }
*
* @Override
* public void onNext(Response response) {
* process(response);
* requestStream.request(1); // Request one more message
* }
* }</pre>
*
* @param request the initial number of messages to request from the server.
* A value of 0 means no messages will be delivered until {@link #request(int)} is called.
* @see #request(int)
* @see #disableAutoFlowControl()
*/
void disableAutoRequestWithInitial(int request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.stream;

/**
* A client-side {@link StreamObserver} that provides a callback to receive a reference to the
* outbound request stream observer before the call starts. This interface mirrors gRPC's
* {@code io.grpc.stub.ClientResponseObserver} for compatibility.
*
* <p>This interface is used for advanced flow control scenarios where the client needs to:
* <ul>
* <li>Configure flow control settings before the stream starts</li>
* <li>Set up an {@link CallStreamObserver#setOnReadyHandler(Runnable) onReadyHandler} for send-side backpressure</li>
* <li>Control the rate of receiving messages using {@link ClientCallStreamObserver#disableAutoRequestWithInitial(int)}</li>
* </ul>
*
* <h3>Example Usage</h3>
* <pre>{@code
* // Client streaming with backpressure
* ClientResponseObserver<DataChunk, Response> responseObserver =
* new ClientResponseObserver<DataChunk, Response>() {
* @Override
* public void beforeStart(ClientCallStreamObserver<DataChunk> requestStream) {
* // Disable auto flow control for manual send control
* requestStream.disableAutoFlowControl();
*
* // Set up onReadyHandler for send-side backpressure
* requestStream.setOnReadyHandler(() -> {
* while (requestStream.isReady() && hasMoreData()) {
* requestStream.onNext(getNextChunk());
* }
* });
* }
*
* @Override
* public void onNext(Response response) { ... }
*
* @Override
* public void onError(Throwable t) { ... }
*
* @Override
* public void onCompleted() { ... }
* };
*
* service.clientStream(responseObserver);
* }</pre>
*
* @param <ReqT> the type of messages sent to the server (request type)
* @param <RespT> the type of messages received from the server (response type)
* @see ClientCallStreamObserver
* @see CallStreamObserver
*/
public interface ClientResponseObserver<ReqT, RespT> extends StreamObserver<RespT> {

/**
* Called by the runtime prior to the start of a call to provide a reference to the
* {@link ClientCallStreamObserver} for the outbound request stream.
*
* <p>This callback is invoked <strong>before</strong> the underlying stream is created,
* allowing the client to configure flow control settings that take effect from the
* beginning of the call.
*
* <p><strong>Allowed operations in this callback:</strong>
* <ul>
* <li>{@link ClientCallStreamObserver#setOnReadyHandler(Runnable)} - Set handler for send-side backpressure</li>
* <li>{@link ClientCallStreamObserver#disableAutoRequestWithInitial(int)} - Configure receive-side backpressure</li>
* <li>{@link CallStreamObserver#disableAutoFlowControl()} - Disable automatic flow control</li>
* </ul>
*
* <p><strong>Note:</strong> Do not call {@link StreamObserver#onNext(Object)} or
* {@link StreamObserver#onCompleted()} within this callback. Data should only be sent
* after the stream is ready (via the {@code onReadyHandler}).
*
* @param requestStream the {@link ClientCallStreamObserver} for sending requests to the server
*/
void beforeStart(final ClientCallStreamObserver<ReqT> requestStream);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.stream;

/**
* A server-side extension of {@link CallStreamObserver} that provides flow control capabilities
* for outbound response streams. This interface mirrors gRPC's
* {@code io.grpc.stub.ServerCallStreamObserver} for compatibility.
*
* <p>On the server side, this interface is obtained by casting the {@link StreamObserver}
* parameter passed to streaming RPC methods. It allows the server to:
* <ul>
* <li>Check if the response stream is ready using {@link #isReady()}</li>
* <li>Set a callback for when the stream becomes ready using {@link #setOnReadyHandler(Runnable)}</li>
* <li>Control inbound flow from the client using {@link #request(int)} and {@link #disableAutoFlowControl()}</li>
* </ul>
*
* <h3>Server-Side Send Backpressure Example</h3>
* <pre>{@code
* @Override
* public void serverStream(Request request, StreamObserver<Response> responseObserver) {
* ServerCallStreamObserver<Response> serverObserver =
* (ServerCallStreamObserver<Response>) responseObserver;
*
* AtomicInteger sent = new AtomicInteger(0);
* int totalCount = 100;
*
* serverObserver.setOnReadyHandler(() -> {
* while (serverObserver.isReady() && sent.get() < totalCount) {
* int seq = sent.getAndIncrement();
* serverObserver.onNext(createResponse(seq));
* }
* if (sent.get() >= totalCount) {
* serverObserver.onCompleted();
* }
* });
* }
* }</pre>
*
* <h3>Server-Side Receive Backpressure Example (for Client/Bidi Streaming)</h3>
* <pre>{@code
* @Override
* public StreamObserver<Request> clientStream(StreamObserver<Response> responseObserver) {
* ServerCallStreamObserver<Response> serverObserver =
* (ServerCallStreamObserver<Response>) responseObserver;
*
* // Control how many messages we receive from the client
* serverObserver.disableAutoFlowControl();
* serverObserver.request(5); // Start with 5 messages
*
* return new StreamObserver<Request>() {
* @Override
* public void onNext(Request request) {
* process(request);
* serverObserver.request(1); // Request one more
* }
* // ... onError, onCompleted
* };
* }
* }</pre>
*
* @param <RespT> the type of messages sent to the client (response type)
* @see CallStreamObserver
* @see StreamObserver
*/
public interface ServerCallStreamObserver<RespT> extends CallStreamObserver<RespT> {

default void disableAutoRequest() {
disableAutoFlowControl();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
*/
package org.apache.dubbo.mutiny;

import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.CancelableStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;

import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
* The middle layer between {@link org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver} and Mutiny API. <p>
* The middle layer between {@link org.apache.dubbo.common.stream.CallStreamObserver} and Mutiny API. <p>
* 1. passing the data received by CallStreamObserver to Mutiny consumer <br>
* 2. passing the request of Mutiny API to CallStreamObserver
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.dubbo.mutiny;

import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.common.stream.CallStreamObserver;

import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.dubbo.mutiny;

import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.common.stream.CallStreamObserver;
import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;

import java.util.function.Consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.dubbo.mutiny;

import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.common.stream.CallStreamObserver;

/**
* Used in ManyToOne and ManyToMany in server. <br>
Expand Down
Loading
Loading