Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to stabilize the WindowedSubscriber test that uses VTS time advancing to close window and test comments. #39418

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
Expand All @@ -19,6 +20,7 @@
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand All @@ -33,6 +35,8 @@
@Execution(ExecutionMode.SAME_THREAD)
@Isolated
public final class WindowedSubscriberFluxWindowIsolatedTest {
private final ClientLogger logger = new ClientLogger(WindowedSubscriberFluxWindowIsolatedTest.class);

@Test
@Execution(ExecutionMode.SAME_THREAD)
public void shouldCloseEmptyWindowOnTimeout() {
Expand All @@ -44,13 +48,14 @@ public void shouldCloseEmptyWindowOnTimeout() {
upstream.subscribe(subscriber);

final AtomicReference<EnqueueResult<Integer>> rRef = new AtomicReference<>();
final Supplier<Publisher<Integer>> scenario = () -> {
final EnqueueResult<Integer> r = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
rRef.set(r);
return r.getWindowFlux();
};

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
final Supplier<Publisher<Integer>> scenario = () -> {
verifier.logIfClosedUnexpectedly(logger);
final EnqueueResult<Integer> r = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
rRef.set(r);
return r.getWindowFlux();
};

verifier.create(scenario)
// Forward time to timeout empty windowTimeout.
.thenAwait(windowTimeout.plusSeconds(10))
Expand All @@ -74,13 +79,14 @@ public void shouldCloseStreamingWindowOnTimeout() {
upstream.subscribe(subscriber);

final AtomicReference<EnqueueResult<Integer>> rRef = new AtomicReference<>();
final Supplier<Publisher<Integer>> scenario = () -> {
final EnqueueResult<Integer> r = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
rRef.set(r);
return r.getWindowFlux();
};

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
final Supplier<Publisher<Integer>> scenario = () -> {
verifier.logIfClosedUnexpectedly(logger);
final EnqueueResult<Integer> r = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
rRef.set(r);
return r.getWindowFlux();
};

verifier.create(scenario)
.then(() -> upstream.next(1))
.then(() -> upstream.next(2))
Expand Down Expand Up @@ -108,17 +114,18 @@ public void shouldContinueToNextWindowWhenEmptyWindowTimeout() {

final AtomicReference<EnqueueResult<Integer>> r0Ref = new AtomicReference<>();
final AtomicReference<EnqueueResult<Integer>> r1Ref = new AtomicReference<>();
final Supplier<Publisher<Integer>> scenario = () -> {
final EnqueueResult<Integer> r0 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
final EnqueueResult<Integer> r1 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
r0Ref.set(r0);
r1Ref.set(r1);
final Flux<Integer> window0Flux = r0.getWindowFlux();
final Flux<Integer> window1Flux = r1.getWindowFlux();
return window0Flux.concatWith(window1Flux);
};

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
final Supplier<Publisher<Integer>> scenario = () -> {
verifier.logIfClosedUnexpectedly(logger);
final EnqueueResult<Integer> r0 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
final EnqueueResult<Integer> r1 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
r0Ref.set(r0);
r1Ref.set(r1);
final Flux<Integer> window0Flux = r0.getWindowFlux();
final Flux<Integer> window1Flux = r1.getWindowFlux();
return window0Flux.concatWith(window1Flux);
};

verifier.create(scenario)
// Forward time to timeout empty window0Flux,
.thenAwait(windowTimeout.plusSeconds(10))
Expand Down Expand Up @@ -154,17 +161,18 @@ public void shouldContinueToNextWindowWhenStreamingWindowTimeout() {

final AtomicReference<EnqueueResult<Integer>> r0Ref = new AtomicReference<>();
final AtomicReference<EnqueueResult<Integer>> r1Ref = new AtomicReference<>();
final Supplier<Publisher<Integer>> scenario = () -> {
final EnqueueResult<Integer> r0 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
final EnqueueResult<Integer> r1 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
r0Ref.set(r0);
r1Ref.set(r1);
final Flux<Integer> window0Flux = r0.getWindowFlux();
final Flux<Integer> window1Flux = r1.getWindowFlux();
return window0Flux.concatWith(window1Flux);
};

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
final Supplier<Publisher<Integer>> scenario = () -> {
verifier.logIfClosedUnexpectedly(logger);
final EnqueueResult<Integer> r0 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
final EnqueueResult<Integer> r1 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
r0Ref.set(r0);
r1Ref.set(r1);
final Flux<Integer> window0Flux = r0.getWindowFlux();
final Flux<Integer> window1Flux = r1.getWindowFlux();
return window0Flux.concatWith(window1Flux);
};

verifier.create(scenario)
.then(() -> upstream.next(1))
.then(() -> upstream.next(2))
Expand All @@ -186,9 +194,19 @@ public void shouldContinueToNextWindowWhenStreamingWindowTimeout() {
Assertions.assertFalse(work0.isCanceled());

final WindowWork<Integer> work1 = r1Ref.get().getInnerWork();
Assertions.assertNotEquals(windowSize, work1.getPending());
Assertions.assertTrue(work1.hasTimedOut());
Assertions.assertFalse(work1.isCanceled());
final boolean hasWindow1ReceivedNothing = work1.getPending() == windowSize;
if (hasWindow1ReceivedNothing) {
// The combination of VirtualTimeScheduler and WindowedSubscriber.drain() sometimes delays arrival of timeout
// signaling for window0, resulting window0 to timeout only after the emission of 3 (and 4). This result in
// window0 to receive 1, 2, 3 and 4, and window1 to receive nothing. Here asserting that, when/if this happens
// application still gets emitted events via window0.
//
final boolean hasWindow0ReceivedAll = work0.getPending() == windowSize - 4; // (demanded - received)
Assertions.assertTrue(hasWindow0ReceivedAll,
String.format("window0 pending: %d, window1 pending: %d", work0.getPending(), work1.getPending()));
}
}

@Test
Expand All @@ -204,19 +222,20 @@ public void shouldContinueToNextWindowWhenStreamingWindowCancels() {

final AtomicReference<EnqueueResult<Integer>> r0Ref = new AtomicReference<>();
final AtomicReference<EnqueueResult<Integer>> r1Ref = new AtomicReference<>();
final Supplier<Publisher<Integer>> scenario = () -> {
final EnqueueResult<Integer> r0 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
final EnqueueResult<Integer> r1 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
r0Ref.set(r0);
r1Ref.set(r1);
final Flux<Integer> window0Flux = r0.getWindowFlux();
final Flux<Integer> window1Flux = r1.getWindowFlux();
return window0Flux
.take(cancelAfter)
.concatWith(window1Flux);
};

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
final Supplier<Publisher<Integer>> scenario = () -> {
verifier.logIfClosedUnexpectedly(logger);
final EnqueueResult<Integer> r0 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
final EnqueueResult<Integer> r1 = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
r0Ref.set(r0);
r1Ref.set(r1);
final Flux<Integer> window0Flux = r0.getWindowFlux();
final Flux<Integer> window1Flux = r1.getWindowFlux();
return window0Flux
.take(cancelAfter)
.concatWith(window1Flux);
};

verifier.create(scenario)
.then(() -> upstream.next(1))
.then(() -> upstream.next(2))
Expand Down Expand Up @@ -248,13 +267,14 @@ public void shouldRequestWindowDemand() {
upstream.subscribe(subscriber);

final AtomicReference<EnqueueResult<Integer>> rRef = new AtomicReference<>();
final Supplier<Publisher<Integer>> scenario = () -> {
final EnqueueResult<Integer> r = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
rRef.set(r);
return r.getWindowFlux();
};

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
final Supplier<Publisher<Integer>> scenario = () -> {
verifier.logIfClosedUnexpectedly(logger);
final EnqueueResult<Integer> r = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
rRef.set(r);
return r.getWindowFlux();
};

verifier.create(scenario)
.thenAwait(windowTimeout.plusSeconds(10))
.verifyComplete();
Expand All @@ -279,18 +299,18 @@ public void shouldAccountPendingRequestWhenServingNextWindowDemand() {

final AtomicReference<EnqueueResult<Integer>> r0Ref = new AtomicReference<>();
final AtomicReference<EnqueueResult<Integer>> r1Ref = new AtomicReference<>();
final Supplier<Publisher<Integer>> scenario = () -> {
final EnqueueResult<Integer> r0 = subscriber.enqueueRequestImpl(window0Size, windowTimeout);
final EnqueueResult<Integer> r1 = subscriber.enqueueRequestImpl(window1Size, windowTimeout);
r0Ref.set(r0);
r1Ref.set(r1);
final Flux<Integer> window0Flux = r0.getWindowFlux();
final Flux<Integer> window1Flux = r1.getWindowFlux();
return window0Flux.concatWith(window1Flux);
};


try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
final Supplier<Publisher<Integer>> scenario = () -> {
verifier.logIfClosedUnexpectedly(logger);
final EnqueueResult<Integer> r0 = subscriber.enqueueRequestImpl(window0Size, windowTimeout);
final EnqueueResult<Integer> r1 = subscriber.enqueueRequestImpl(window1Size, windowTimeout);
r0Ref.set(r0);
r1Ref.set(r1);
final Flux<Integer> window0Flux = r0.getWindowFlux();
final Flux<Integer> window1Flux = r1.getWindowFlux();
return window0Flux.concatWith(window1Flux);
};

verifier.create(scenario)
// timeout window0Flux without receiving (so pending request become 'windowSize0'), and pick next work.
.thenAwait(windowTimeout.plusSeconds(10))
Expand Down Expand Up @@ -322,17 +342,19 @@ public void shouldPickEnqueuedWindowRequestsOnSubscriptionReady() {

final AtomicReference<EnqueueResult<Integer>> r0Ref = new AtomicReference<>();
final AtomicReference<EnqueueResult<Integer>> r1Ref = new AtomicReference<>();
final Supplier<Publisher<Integer>> scenario = () -> {
final EnqueueResult<Integer> r0 = subscriber.enqueueRequestImpl(window0Size, windowTimeout);
final EnqueueResult<Integer> r1 = subscriber.enqueueRequestImpl(window1Size, windowTimeout);
r0Ref.set(r0);
r1Ref.set(r1);
final Flux<Integer> window0Flux = r0.getWindowFlux();
final Flux<Integer> window1Flux = r1.getWindowFlux();
return window0Flux.concatWith(window1Flux);
};

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
final Supplier<Publisher<Integer>> scenario = () -> {
verifier.logIfClosedUnexpectedly(logger);
final EnqueueResult<Integer> r0 = subscriber.enqueueRequestImpl(window0Size, windowTimeout);
final EnqueueResult<Integer> r1 = subscriber.enqueueRequestImpl(window1Size, windowTimeout);
r0Ref.set(r0);
r1Ref.set(r1);
final Flux<Integer> window0Flux = r0.getWindowFlux();
final Flux<Integer> window1Flux = r1.getWindowFlux();
return window0Flux.concatWith(window1Flux);
};

verifier.create(scenario)
// subscribe after enqueuing requests in 'scenario' (mimicking late arrival of subscription).
.then(() -> upstream.subscribe(subscriber))
Expand Down Expand Up @@ -366,14 +388,13 @@ public void shouldInvokeReleaserWhenNoWindowToService() {
final WindowedSubscriber<Integer> subscriber = createSubscriber(options.setReleaser(releaser));
upstream.subscribe(subscriber);

final AtomicReference<EnqueueResult<Integer>> rRef = new AtomicReference<>();
final Supplier<Publisher<Integer>> scenario = () -> {
final EnqueueResult<Integer> r = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
rRef.set(r);
return r.getWindowFlux();
};

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
final Supplier<Publisher<Integer>> scenario = () -> {
verifier.logIfClosedUnexpectedly(logger);
final EnqueueResult<Integer> r = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
return r.getWindowFlux();
};

verifier.create(scenario)
// forward time to timeout windowFlux without receiving.
.thenAwait(windowTimeout.plusSeconds(10))
Expand All @@ -400,14 +421,13 @@ public void shouldStopInvokingReleaserOnUpstreamTermination() {
final WindowedSubscriber<Integer> subscriber = createSubscriber(options.setReleaser(releaser));
upstream.subscribe(subscriber);

final AtomicReference<EnqueueResult<Integer>> rRef = new AtomicReference<>();
final Supplier<Publisher<Integer>> scenario = () -> {
final EnqueueResult<Integer> r = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
rRef.set(r);
return r.getWindowFlux();
};

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
final Supplier<Publisher<Integer>> scenario = () -> {
verifier.logIfClosedUnexpectedly(logger);
final EnqueueResult<Integer> r = subscriber.enqueueRequestImpl(windowSize, windowTimeout);
return r.getWindowFlux();
};

verifier.create(scenario)
// forward time to timeout windowFlux without receiving.
.thenAwait(windowTimeout.plusSeconds(10))
Expand All @@ -424,10 +444,11 @@ public void shouldStopInvokingReleaserOnUpstreamTermination() {
Assertions.assertEquals(Arrays.asList(1, 2), released);
}

private static final class VirtualTimeStepVerifier implements AutoCloseable {
private static final class VirtualTimeStepVerifier extends AtomicBoolean implements AutoCloseable {
private final VirtualTimeScheduler scheduler;

VirtualTimeStepVerifier() {
super(false);
scheduler = VirtualTimeScheduler.create();
}

Expand All @@ -437,8 +458,21 @@ <T> StepVerifier.Step<T> create(Supplier<Publisher<T>> scenarioSupplier) {

@Override
public void close() {
super.set(true);
scheduler.dispose();
}

void logIfClosedUnexpectedly(ClientLogger logger) {
final boolean wasAutoClosed = get();
final boolean isSchedulerDisposed = scheduler.isDisposed();
if (wasAutoClosed || isSchedulerDisposed) {
if (!wasAutoClosed) {
logger.atError().log("VirtualTimeScheduler unavailable (unexpected close from outside of the test).");
} else {
logger.atError().log("VirtualTimeScheduler unavailable (unexpected close by the test).");
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Liudmila and I’ve been investigating a strange random CI error with the following stack trace –

[ForkJoinPool-1-worker-1] WARN com.azure.messaging.servicebus.WindowedSubscriber$WindowWork - {"az.sdk.message":"Terminating the work. Error while scheduling or waiting for timeout.","exception":"Scheduler unavailable","demand":1,"workId":0,"pending":1}
reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
	at reactor.core.Exceptions.failWithRejected(Exceptions.java:271)
	at reactor.core.publisher.Operators.onRejectedExecution(Operators.java:1024)
	at reactor.core.publisher.MonoDelay.subscribe(MonoDelay.java:64)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4491)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4606)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4458)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4394)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4366)
	at com.azure.messaging.servicebus.WindowedSubscriber$WindowWork.beginTimeoutTimer(WindowedSubscriber.java:859)
	at com.azure.messaging.servicebus.WindowedSubscriber$WindowWork.init(WindowedSubscriber.java:700)
	at com.azure.messaging.servicebus.WindowedSubscriber$WindowWork.access$1100(WindowedSubscriber.java:576)
	at com.azure.messaging.servicebus.WindowedSubscriber.initWorkOnce(WindowedSubscriber.java:386)
	at com.azure.messaging.servicebus.WindowedSubscriber.drainLoop(WindowedSubscriber.java:264)
	at com.azure.messaging.servicebus.WindowedSubscriber.drain(WindowedSubscriber.java:223)
	at com.azure.messaging.servicebus.WindowedSubscriber.enqueueRequestImpl(WindowedSubscriber.java:144)
	at com.azure.messaging.servicebus.WindowedSubscriberFluxWindowIsolatedTest.lambda$shouldPickEnqueuedWindowRequestsOnSubscriptionReady$19(WindowedSubscriberFluxWindowIsolatedTest.java:326)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.toVerifierAndSubscribe(DefaultStepVerifierBuilder.java:862)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:831)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:823)
	at reactor.test.DefaultStepVerifierBuilder.verifyComplete(DefaultStepVerifierBuilder.java:690)
	at com.azure.messaging.servicebus.WindowedSubscriberFluxWindowIsolatedTest.shouldPickEnqueuedWindowRequestsOnSubscriptionReady(WindowedSubscriberFluxWindowIsolatedTest.java:344)

All the tests in this class are running under VirtualTimeScheduler (VTS) and once in a while the test "shouldPickEnqueuedWindowRequestsOnSubscriptionReady“ fails with "Scheduler unavailable".

From the stack trace this error comes from the scheduler (should be VTS in case of test) when WindowedSubscriber tries to register the timeout as part of starting the first WindowWork.

While in the real-world scenario the scheduler for timeout will be the global ParallelScheduler not VTS, both schedulers can throw this error only when it’s shutdown.

In the test we shut down the local VTS (scoped to the test) only when the test is completed, not in the middle of the test run. It’s known that VTS infra tracks the current VTS instance in a static member, so we had seen similar "scheduler unavailable" when two test using VTS infra runs in parallel and completion of one disposes the static current VTS. But as of today, we run all VTS tests in Isolated mode, so this should not happen.

Given this is so random failure and also observed happening only when CI is loaded, adding this logging function to shed some light into what’s going on here. The tests log using this function in case the VTS gets closed unexpectedly, this helps us,

  1. To see if the problem is local to the test, I.e., test run forward and closes the VTS.
  2. To see if the problem triggered from outside of the test, i.e., the VTS infra itself tear down the static current VTS holder.
  3. To see if there is for some reason VTS is not even gets injected.

}
}

private static class Releaser<T> implements Consumer<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,8 @@ public void shouldContinueToNextWindowOnEmptyWindowTimeout() {
final IterableStream<Integer> window0Iterable = r0.getWindowIterable();
final IterableStream<Integer> window1Iterable = r1.getWindowIterable();


// When time out triggers, it will close the window0Iterable stream, which will end the blocking collect() call
// by returning "empty" list since "no events were received" within the timeout.
final List<Integer> list0 = window0Iterable.stream().collect(Collectors.toList());
Assertions.assertEquals(0, list0.size());

Expand Down Expand Up @@ -694,6 +695,8 @@ public void shouldContinueToNextWindowWhenStreamingWindowTimeout() {
upstream.next(1);
upstream.next(2);

// When time out triggers, it will close the window0Iterable stream, which will end the blocking collect() call
// and return the list, list0, with events received so far (which is less than demanded).
final List<Integer> list0 = window0Iterable.stream().collect(Collectors.toList());
Assertions.assertEquals(Arrays.asList(1, 2), list0);

Expand Down
Loading