Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.stub.StubInvocationUtil;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -31,6 +32,7 @@

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
Expand Down Expand Up @@ -91,6 +93,7 @@ void testOneToManyReturnsMultiAndEmitsItems() {

try (MockedStatic<StubInvocationUtil> mocked = Mockito.mockStatic(StubInvocationUtil.class)) {
AtomicBoolean stubCalled = new AtomicBoolean(false);
CountDownLatch subscribed = new CountDownLatch(1);

mocked.when(() -> StubInvocationUtil.serverStreamCall(
Mockito.eq(invoker), Mockito.eq(method), Mockito.eq("testRequest"), Mockito.any()))
Expand All @@ -100,7 +103,9 @@ void testOneToManyReturnsMultiAndEmitsItems() {

CallStreamObserver<String> fakeSubscription = new CallStreamObserver<>() {
@Override
public void request(int n) {}
public void request(int n) {
/* no-op */
Comment on lines +106 to +107
Copy link

Copilot AI Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comment '/* no-op */' should be more descriptive to explain why this method is intentionally empty in the test context.

Suggested change
public void request(int n) {
/* no-op */
// Intentionally left empty: flow control is not required for this test's mock observer.

Copilot uses AI. Check for mistakes.
}

@Override
public void setCompression(String compression) {}
Expand All @@ -109,8 +114,8 @@ public void setCompression(String compression) {}
public void disableAutoFlowControl() {}

@Override
public void onNext(String value) {
publisher.onNext(value);
public void onNext(String v) {
publisher.onNext(v);
}

@Override
Expand All @@ -123,29 +128,47 @@ public void onCompleted() {
publisher.onCompleted();
}
};

publisher.onSubscribe(fakeSubscription);

// Wait for downstream subscription to complete before emitting data
new Thread(() -> {
publisher.onNext("item1");
publisher.onNext("item2");
publisher.onCompleted();
try {
if (subscribed.await(5, TimeUnit.SECONDS)) {
publisher.onNext("item1");
publisher.onNext("item2");
publisher.onCompleted();
} else {
publisher.onError(
new IllegalStateException("Downstream subscription timeout"));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
publisher.onError(e);
}
})
.start();

return null;
});

Uni<String> uniRequest = Uni.createFrom().item("testRequest");

Multi<String> multiResponse = MutinyClientCalls.oneToMany(invoker, uniRequest, method);

List<String> collectedItems =
multiResponse.collect().asList().await().indefinitely();
// Use AssertSubscriber to ensure proper subscription timing
AssertSubscriber<String> subscriber = AssertSubscriber.create(Long.MAX_VALUE);
multiResponse.subscribe().withSubscriber(subscriber);

// Wait for subscription to be established
subscriber.awaitSubscription();
subscribed.countDown(); // Signal that data emission can begin

// Wait for completion
subscriber.awaitCompletion(Duration.ofSeconds(5));

// Verify results
Assertions.assertTrue(stubCalled.get(), "StubInvocationUtil.serverStreamCall should be called");
Assertions.assertEquals(2, collectedItems.size());
Assertions.assertEquals(List.of("item1", "item2"), collectedItems);
Assertions.assertEquals(List.of("item1", "item2"), subscriber.getItems());
subscriber.assertCompleted();
}
}

Expand Down
Loading