diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoPublishOnTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoPublishOnTest.java index abd2567b04..18732e0f24 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoPublishOnTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoPublishOnTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2015-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,13 +55,12 @@ public void rejectedExecutionExceptionOnDataSignalExecutor() final AtomicReference dataInOnOperatorError = new AtomicReference<>(); try { - - CountDownLatch hookLatch = new CountDownLatch(1); + CountDownLatch finallyLatch = new CountDownLatch(1); + CountDownLatch inOnNextLatch = new CountDownLatch(1); Hooks.onOperatorError((t, d) -> { throwableInOnOperatorError.set(t); dataInOnOperatorError.set(d); - hookLatch.countDown(); return t; }); @@ -73,22 +72,25 @@ public void rejectedExecutionExceptionOnDataSignalExecutor() .publishOn(fromExecutorService(executor)) .doOnNext(s -> { try { + inOnNextLatch.countDown(); latch.await(); } catch (InterruptedException e) { } }) .publishOn(fromExecutor(executor)) + .doFinally(s -> finallyLatch.countDown()) .subscribe(assertSubscriber); + inOnNextLatch.await(); executor.shutdownNow(); + finallyLatch.await(); + assertSubscriber.assertNoValues() - .assertNoError() + .assertError(RejectedExecutionException.class) .assertNotComplete(); - hookLatch.await(); - assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class); assertThat(data).isSameAs(dataInOnOperatorError.get()); } @@ -109,13 +111,12 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor() final AtomicReference dataInOnOperatorError = new AtomicReference<>(); try { - - CountDownLatch hookLatch = new CountDownLatch(2); + CountDownLatch finallyLatch = new CountDownLatch(1); + CountDownLatch inOnNextLatch = new CountDownLatch(1); Hooks.onOperatorError((t, d) -> { throwableInOnOperatorError.set(t); dataInOnOperatorError.set(d); - hookLatch.countDown(); return t; }); @@ -127,6 +128,7 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor() .publishOn(fromExecutorService(executor)) .doOnNext(s -> { try { + inOnNextLatch.countDown(); latch.await(); } catch (InterruptedException e) { @@ -134,16 +136,17 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor() } }) .publishOn(fromExecutor(executor)) + .doFinally(s -> finallyLatch.countDown()) .subscribe(assertSubscriber); + inOnNextLatch.await(); executor.shutdownNow(); + finallyLatch.await(); assertSubscriber.assertNoValues() - .assertNoError() + .assertError(RejectedExecutionException.class) .assertNotComplete(); - hookLatch.await(); - assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class); assertThat(exception).isSameAs(throwableInOnOperatorError.get() .getSuppressed()[0]); @@ -164,13 +167,12 @@ public void rejectedExecutionExceptionOnDataSignalExecutorService() final AtomicReference dataInOnOperatorError = new AtomicReference<>(); try { - - CountDownLatch hookLatch = new CountDownLatch(1); + CountDownLatch finallyLatch = new CountDownLatch(1); + CountDownLatch inOnNextLatch = new CountDownLatch(1); Hooks.onOperatorError((t, d) -> { throwableInOnOperatorError.set(t); dataInOnOperatorError.set(d); - hookLatch.countDown(); return t; }); @@ -182,22 +184,25 @@ public void rejectedExecutionExceptionOnDataSignalExecutorService() .publishOn(fromExecutorService(executor)) .doOnNext(s -> { try { + inOnNextLatch.countDown(); latch.await(); } catch (InterruptedException e) { } }) .publishOn(fromExecutorService(executor)) + .doFinally(s -> finallyLatch.countDown()) .subscribe(assertSubscriber); + inOnNextLatch.await(); + executor.shutdownNow(); + finallyLatch.await(); assertSubscriber.assertNoValues() - .assertNoError() + .assertError(RejectedExecutionException.class) .assertNotComplete(); - hookLatch.await(); - assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class); assertThat(data).isSameAs(dataInOnOperatorError.get()); } @@ -218,13 +223,12 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService() final AtomicReference dataInOnOperatorError = new AtomicReference<>(); try { - - CountDownLatch hookLatch = new CountDownLatch(2); + CountDownLatch finallyLatch = new CountDownLatch(1); + CountDownLatch inOnNextLatch = new CountDownLatch(1); Hooks.onOperatorError((t, d) -> { throwableInOnOperatorError.set(t); dataInOnOperatorError.set(d); - hookLatch.countDown(); return t; }); @@ -236,6 +240,7 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService() .publishOn(fromExecutorService(executor)) .doOnNext(s -> { try { + inOnNextLatch.countDown(); latch.await(); } catch (InterruptedException e) { @@ -243,16 +248,18 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService() } }) .publishOn(fromExecutorService(executor)) + .doFinally(s -> finallyLatch.countDown()) .subscribe(assertSubscriber); + inOnNextLatch.await(); + executor.shutdownNow(); + finallyLatch.await(); assertSubscriber.assertNoValues() - .assertNoError() + .assertError(RejectedExecutionException.class) .assertNotComplete(); - hookLatch.await(); - assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class); assertThat(exception).isSameAs(throwableInOnOperatorError.get() .getSuppressed()[0]);