Skip to content

Commit

Permalink
Fix MonoPublishOnTest flakiness (#3898)
Browse files Browse the repository at this point in the history
The implementation relied on a specific sequencing of events, which in
case of slower hardware or random events would occasionally fail the
assertions. This change introduces firm ordering and fixes an incorrect
assertion, which assumed the `Mono` terminates without error, while in
fact the `RejectedExecutionException` is propagated to the
`AssertSubscriber`. In effect, the tests are no longer flaky.
  • Loading branch information
chemicL authored Oct 2, 2024
1 parent bd5c4e5 commit aa26c12
Showing 1 changed file with 32 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,13 +55,12 @@ public void rejectedExecutionExceptionOnDataSignalExecutor()
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();

try {

CountDownLatch hookLatch = new CountDownLatch(1);
CountDownLatch finallyLatch = new CountDownLatch(1);
CountDownLatch inOnNextLatch = new CountDownLatch(1);

Hooks.onOperatorError((t, d) -> {
throwableInOnOperatorError.set(t);
dataInOnOperatorError.set(d);
hookLatch.countDown();
return t;
});

Expand All @@ -73,22 +72,25 @@ public void rejectedExecutionExceptionOnDataSignalExecutor()
.publishOn(fromExecutorService(executor))
.doOnNext(s -> {
try {
inOnNextLatch.countDown();
latch.await();
}
catch (InterruptedException e) {
}
})
.publishOn(fromExecutor(executor))
.doFinally(s -> finallyLatch.countDown())
.subscribe(assertSubscriber);

inOnNextLatch.await();
executor.shutdownNow();

finallyLatch.await();

assertSubscriber.assertNoValues()
.assertNoError()
.assertError(RejectedExecutionException.class)
.assertNotComplete();

hookLatch.await();

assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
assertThat(data).isSameAs(dataInOnOperatorError.get());
}
Expand All @@ -109,13 +111,12 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor()
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();

try {

CountDownLatch hookLatch = new CountDownLatch(2);
CountDownLatch finallyLatch = new CountDownLatch(1);
CountDownLatch inOnNextLatch = new CountDownLatch(1);

Hooks.onOperatorError((t, d) -> {
throwableInOnOperatorError.set(t);
dataInOnOperatorError.set(d);
hookLatch.countDown();
return t;
});

Expand All @@ -127,23 +128,25 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor()
.publishOn(fromExecutorService(executor))
.doOnNext(s -> {
try {
inOnNextLatch.countDown();
latch.await();
}
catch (InterruptedException e) {
throw Exceptions.propagate(exception);
}
})
.publishOn(fromExecutor(executor))
.doFinally(s -> finallyLatch.countDown())
.subscribe(assertSubscriber);

inOnNextLatch.await();
executor.shutdownNow();

finallyLatch.await();
assertSubscriber.assertNoValues()
.assertNoError()
.assertError(RejectedExecutionException.class)
.assertNotComplete();

hookLatch.await();

assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
assertThat(exception).isSameAs(throwableInOnOperatorError.get()
.getSuppressed()[0]);
Expand All @@ -164,13 +167,12 @@ public void rejectedExecutionExceptionOnDataSignalExecutorService()
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();

try {

CountDownLatch hookLatch = new CountDownLatch(1);
CountDownLatch finallyLatch = new CountDownLatch(1);
CountDownLatch inOnNextLatch = new CountDownLatch(1);

Hooks.onOperatorError((t, d) -> {
throwableInOnOperatorError.set(t);
dataInOnOperatorError.set(d);
hookLatch.countDown();
return t;
});

Expand All @@ -182,22 +184,25 @@ public void rejectedExecutionExceptionOnDataSignalExecutorService()
.publishOn(fromExecutorService(executor))
.doOnNext(s -> {
try {
inOnNextLatch.countDown();
latch.await();
}
catch (InterruptedException e) {
}
})
.publishOn(fromExecutorService(executor))
.doFinally(s -> finallyLatch.countDown())
.subscribe(assertSubscriber);

inOnNextLatch.await();

executor.shutdownNow();

finallyLatch.await();
assertSubscriber.assertNoValues()
.assertNoError()
.assertError(RejectedExecutionException.class)
.assertNotComplete();

hookLatch.await();

assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
assertThat(data).isSameAs(dataInOnOperatorError.get());
}
Expand All @@ -218,13 +223,12 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService()
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();

try {

CountDownLatch hookLatch = new CountDownLatch(2);
CountDownLatch finallyLatch = new CountDownLatch(1);
CountDownLatch inOnNextLatch = new CountDownLatch(1);

Hooks.onOperatorError((t, d) -> {
throwableInOnOperatorError.set(t);
dataInOnOperatorError.set(d);
hookLatch.countDown();
return t;
});

Expand All @@ -236,23 +240,26 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService()
.publishOn(fromExecutorService(executor))
.doOnNext(s -> {
try {
inOnNextLatch.countDown();
latch.await();
}
catch (InterruptedException e) {
throw Exceptions.propagate(exception);
}
})
.publishOn(fromExecutorService(executor))
.doFinally(s -> finallyLatch.countDown())
.subscribe(assertSubscriber);

inOnNextLatch.await();

executor.shutdownNow();

finallyLatch.await();
assertSubscriber.assertNoValues()
.assertNoError()
.assertError(RejectedExecutionException.class)
.assertNotComplete();

hookLatch.await();

assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
assertThat(exception).isSameAs(throwableInOnOperatorError.get()
.getSuppressed()[0]);
Expand Down

0 comments on commit aa26c12

Please sign in to comment.