diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 67f6d87471..bbf9d76caf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,7 +33,10 @@ jobs: echo -e "\n - \u001b[1mSpotless (license headers)\u001b[0m failures on touched java files \033[38;5;255;48;5;0m\u001b[1mcan be automatically fixed by running\u001b[0m:" echo -e " \033[38;5;0;48;5;255m ./gradlew spotlessApply \033[0m" echo -e "\n - \u001b[1mAPI Compatibility\u001b[0m failures should be considered carefully and \033[38;5;255;48;5;0m\u001b[1mdiscussed with maintainers in the PR\u001b[0m" - echo " If there are failures, the detail should be available in the logs of the api compatibility step above" + echo " If there are failures, the detail should be available in the step's log:" + echo -e " Look for the \033[38;5;0;48;5;255m API compatibility failures \033[0m block(s)." + echo " Alternatively, locally run the following command to get access to the full report:" + echo -e " \033[38;5;0;48;5;255m ./gradlew japicmp \033[0m" echo "" exit -1 core-fast: diff --git a/README.md b/README.md index 42b6503225..a6cadc0e8a 100644 --- a/README.md +++ b/README.md @@ -33,15 +33,15 @@ repositories { } dependencies { - compile "io.projectreactor:reactor-core:3.4.23" - testCompile "io.projectreactor:reactor-test:3.4.23" + compile "io.projectreactor:reactor-core:3.4.24" + testCompile "io.projectreactor:reactor-test:3.4.24" // Alternatively, use the following for latest snapshot artifacts in this line - // compile "io.projectreactor:reactor-core:3.4.24-SNAPSHOT" - // testCompile "io.projectreactor:reactor-test:3.4.24-SNAPSHOT" + // compile "io.projectreactor:reactor-core:3.4.25-SNAPSHOT" + // testCompile "io.projectreactor:reactor-test:3.4.25-SNAPSHOT" // Optionally, use `reactor-tools` to help debugging reactor code - // implementation "io.projectreactor:reactor-tools:3.4.23" + // implementation "io.projectreactor:reactor-tools:3.4.24" } ``` diff --git a/gradle.properties b/gradle.properties index fa05bec2d6..dd4fad2c1f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -version=3.4.23 -bomVersion=2020.0.23 +version=3.4.24 +bomVersion=2020.0.24 diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 5232d56af5..a41fee2f9b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,14 +1,14 @@ [versions] # Baselines, should be updated on every release -baseline-core-api = "3.4.22" -baselinePerfCore = "3.4.22" +baseline-core-api = "3.4.23" +baselinePerfCore = "3.4.23" baselinePerfExtra = "3.4.8" # Other shared versions asciidoctor = "3.3.2" -bytebuddy = "1.12.14" +bytebuddy = "1.12.17" jmh = "1.35" -junit = "5.9.0" +junit = "5.9.1" kotlin = "1.5.32" reactiveStreams = "1.0.4" @@ -29,25 +29,25 @@ junit-bom = { module = "org.junit:junit-bom", version.ref = "junit" } kotlin-stdlib = { module = "org.jetbrains.kotlin:kotlin-stdlib", version.ref = "kotlin" } logback = "ch.qos.logback:logback-classic:1.2.11" micrometer = "io.micrometer:micrometer-core:1.3.0" -mockito = "org.mockito:mockito-core:4.7.0" +mockito = "org.mockito:mockito-core:4.8.0" reactiveStreams = { module = "org.reactivestreams:reactive-streams", version.ref = "reactiveStreams" } reactiveStreams-tck = { module = "org.reactivestreams:reactive-streams-tck", version.ref = "reactiveStreams" } reactor-perfBaseline-core = { module = "io.projectreactor:reactor-core", version.ref = "baselinePerfCore" } reactor-perfBaseline-extra = { module = "io.projectreactor.addons:reactor-extra", version.ref = "baselinePerfExtra" } slf4j = "org.slf4j:slf4j-api:1.7.36" -testNg = "org.testng:testng:7.5" +testNg = "org.testng:testng:7.5" # since 7.6 JDK8 is unsupported, don't bump throwingFunction = "com.pivovarit:throwing-function:1.5.1" [plugins] -artifactory = { id = "com.jfrog.artifactory", version = "4.29.0" } +artifactory = { id = "com.jfrog.artifactory", version = "4.29.1" } asciidoctor-convert = { id = "org.asciidoctor.jvm.convert", version.ref = "asciidoctor" } asciidoctor-pdf = { id = "org.asciidoctor.jvm.pdf", version.ref = "asciidoctor" } bnd = { id = "biz.aQute.bnd.builder", version = "6.3.1" } -download = { id = "de.undercouch.download", version = "5.1.2" } -japicmp = { id = "me.champeau.gradle.japicmp", version = "0.4.0" } -jcstress = { id = "io.github.reyerizo.gradle.jcstress", version = "0.8.13" } +download = { id = "de.undercouch.download", version = "5.2.1" } +japicmp = { id = "me.champeau.gradle.japicmp", version = "0.4.1" } +jcstress = { id = "io.github.reyerizo.gradle.jcstress", version = "0.8.14" } kotlin = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" } nohttp = { id = "io.spring.nohttp", version = "0.0.10" } shadow = { id = "com.github.johnrengelman.shadow", version = "7.1.2" } -spotless = { id = "com.diffplug.spotless", version = "6.10.0" } +spotless = { id = "com.diffplug.spotless", version = "6.11.0" } testsets = { id = "org.unbroken-dome.test-sets", version = "4.0.0" } diff --git a/reactor-core/build.gradle b/reactor-core/build.gradle index 36c974451d..44ad59f663 100644 --- a/reactor-core/build.gradle +++ b/reactor-core/build.gradle @@ -135,6 +135,28 @@ task downloadBaseline(type: Download) { dest "${buildDir}/baselineLibs/reactor-core-${libs.versions.baseline.core.api.get()}.jar" } +def japicmpReport = tasks.register('japicmpReport') { + onlyIf { + japicmp.state.failure != null + } + doLast { + def reportFile = file("${project.buildDir}/reports/japi.txt") + if (reportFile.exists()) { + println "\n **********************************" + println " * /!\\ API compatibility failures *" + println " **********************************" + println "Japicmp report was filtered and interpreted to find the following incompatibilities:" + reportFile.eachLine { + if (it.contains("*") && (!it.contains("***") || it.contains("****"))) + println "source incompatible change: $it" + else if (it.contains("!")) + println "binary incompatible change: $it" + } + } + else println "No incompatible change to report" + } +} + task japicmp(type: JapicmpTask) { if (project.gradle.startParameter.isOffline()) { println "Offline: skipping downloading of baseline and JAPICMP" @@ -148,16 +170,21 @@ task japicmp(type: JapicmpTask) { println "Will download and perform baseline comparison with ${libs.versions.baseline.core.api.get()}" dependsOn(downloadBaseline) dependsOn(jar) + finalizedBy(japicmpReport) } oldClasspath.from(files("${buildDir}/baselineLibs/reactor-core-${libs.versions.baseline.core.api.get()}.jar")) newClasspath.from(files(jar.archiveFile)) - onlyBinaryIncompatibleModified = true + // these onlyXxx parameters result in a report that is slightly too noisy, but better than + // onlyBinaryIncompatibleModified = true which masks source-incompatible-only changes + onlyBinaryIncompatibleModified = false + onlyModified = true failOnModification = true failOnSourceIncompatibility = true txtOutputFile = file("${project.buildDir}/reports/japi.txt") ignoreMissingClasses = true includeSynthetic = true + compatibilityChangeExcludes = [ "METHOD_NEW_DEFAULT" ] // TODO after a .0 release, bump the gradle.properties baseline // TODO after a .0 release, remove the reactor-core exclusions below if any diff --git a/reactor-core/src/blockHoundTest/java/reactor/core/scheduler/ReactorBlockHoundIntegrationTest.java b/reactor-core/src/blockHoundTest/java/reactor/core/scheduler/ReactorBlockHoundIntegrationTest.java index ca0b5b1847..93540d4236 100644 --- a/reactor-core/src/blockHoundTest/java/reactor/core/scheduler/ReactorBlockHoundIntegrationTest.java +++ b/reactor-core/src/blockHoundTest/java/reactor/core/scheduler/ReactorBlockHoundIntegrationTest.java @@ -74,19 +74,19 @@ public void shouldDetectBlockingCallsInOperators() { } @Test - public void shouldNotReportScheduledFutureTask() { - for (int i = 0; i < 1_000; i++) { + public void shouldNotReportSchedulerScheduledFutureTask() { + for (int i = 0; i < 10_000; i++) { Scheduler taskScheduler = Schedulers.newSingle("foo"); try { Runnable dummyRunnable = () -> { }; for (int j = 0; j < 257; j++) { - taskScheduler.schedule(dummyRunnable, 200, TimeUnit.MILLISECONDS); + taskScheduler.schedule(dummyRunnable, j + 1, TimeUnit.MILLISECONDS); } Disposable disposable = taskScheduler.schedule(dummyRunnable, 1, TimeUnit.SECONDS); - RaceTestUtils.race(disposable::dispose, disposable::dispose); + RaceTestUtils.race(Schedulers.parallel(), disposable::dispose); } finally { taskScheduler.dispose(); @@ -94,6 +94,30 @@ public void shouldNotReportScheduledFutureTask() { } } + @Test + public void shouldNotReportWorkerScheduledFutureTask() { + for (int i = 0; i < 10_000; i++) { + Scheduler scheduler = Schedulers.newSingle("foo"); + Scheduler.Worker worker = scheduler.createWorker(); + + try { + Runnable dummyRunnable = () -> { + }; + + for (int j = 0; j < 257; j++) { + worker.schedule(dummyRunnable, j + 1, TimeUnit.MILLISECONDS); + } + Disposable disposable = worker.schedule(dummyRunnable, 1, TimeUnit.SECONDS); + + RaceTestUtils.race(Schedulers.parallel(), disposable::dispose); + } + finally { + worker.dispose(); + scheduler.dispose(); + } + } + } + void expectBlockingCall(String desc, Consumer> callable) { Assertions .assertThatThrownBy(() -> { diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 1f08bed82f..92ee741c04 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -7219,7 +7219,7 @@ public final ConnectableFlux publish() { */ public final ConnectableFlux publish(int prefetch) { return onAssembly(new FluxPublish<>(this, prefetch, Queues - .get(prefetch))); + .get(prefetch), true)); } /** @@ -7565,10 +7565,12 @@ public final ConnectableFlux replay() { * Will retain up to the given history size onNext signals. Completion and Error will also be * replayed. *

- * Note that {@code cache(0)} will only cache the terminal signal without + * Note that {@code replay(0)} will only cache the terminal signal without * expiration. * *

+ * Re-connects are not supported. + *

* * * @param history number of events retained in history excluding complete and @@ -7579,8 +7581,8 @@ public final ConnectableFlux replay() { */ public final ConnectableFlux replay(int history) { if (history == 0) { - //TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version - return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE))); + return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, + Queues.get(Queues.SMALL_BUFFER_SIZE), false)); } return onAssembly(new FluxReplay<>(this, history, 0L, null)); } @@ -7662,8 +7664,8 @@ public final ConnectableFlux replay(Duration ttl, Scheduler timer) { public final ConnectableFlux replay(int history, Duration ttl, Scheduler timer) { Objects.requireNonNull(timer, "timer"); if (history == 0) { - //TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version - return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE))); + return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, + Queues.get(Queues.SMALL_BUFFER_SIZE), true)); } return onAssembly(new FluxReplay<>(this, history, ttl.toNanos(), timer)); } @@ -7986,8 +7988,10 @@ public final Flux scanWith(Supplier initial, BiFunction share() { - return onAssembly(new FluxRefCount<>( - new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.small()), 1) + return onAssembly( + new FluxRefCount<>(new FluxPublish<>( + this, Queues.SMALL_BUFFER_SIZE, Queues.small(), true + ), 1) ); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java b/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java index 98a03c7790..3b37838b99 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,6 +57,11 @@ final class FluxPublish extends ConnectableFlux implements Scannable { final Supplier> queueSupplier; + /** + * Whether to prepare for a reconnect after the source terminates. + */ + final boolean resetUponSourceTermination; + volatile PublishSubscriber connection; @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater CONNECTION = @@ -66,13 +71,15 @@ final class FluxPublish extends ConnectableFlux implements Scannable { FluxPublish(Flux source, int prefetch, - Supplier> queueSupplier) { + Supplier> queueSupplier, + boolean resetUponSourceTermination) { if (prefetch <= 0) { throw new IllegalArgumentException("bufferSize > 0 required but it was " + prefetch); } this.source = Objects.requireNonNull(source, "source"); this.prefetch = prefetch; this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier"); + this.resetUponSourceTermination = resetUponSourceTermination; } @Override @@ -111,7 +118,7 @@ public void subscribe(CoreSubscriber actual) { } PublishSubscriber c = connection; - if (c == null || c.isTerminated()) { + if (c == null || (this.resetUponSourceTermination && c.isTerminated())) { PublishSubscriber u = new PublishSubscriber<>(prefetch, this); if (!CONNECTION.compareAndSet(this, c, u)) { continue; @@ -123,12 +130,18 @@ public void subscribe(CoreSubscriber actual) { if (c.add(inner)) { if (inner.isCancelled()) { c.remove(inner); - } - else { + } else { inner.parent = c; } c.drain(); break; + } else if (!this.resetUponSourceTermination) { + if (c.error != null) { + inner.actual.onError(c.error); + } else { + inner.actual.onComplete(); + } + break; } } } @@ -515,8 +528,10 @@ boolean checkTerminated(boolean d, boolean empty) { if (d) { Throwable e = error; if (e != null && e != Exceptions.TERMINATED) { - CONNECTION.compareAndSet(parent, this, null); - e = Exceptions.terminate(ERROR, this); + if (parent.resetUponSourceTermination) { + CONNECTION.compareAndSet(parent, this, null); + e = Exceptions.terminate(ERROR, this); + } queue.clear(); for (PubSubInner inner : terminate()) { inner.actual.onError(e); @@ -524,7 +539,9 @@ boolean checkTerminated(boolean d, boolean empty) { return true; } else if (empty) { - CONNECTION.compareAndSet(parent, this, null); + if (parent.resetUponSourceTermination) { + CONNECTION.compareAndSet(parent, this, null); + } for (PubSubInner inner : terminate()) { inner.actual.onComplete(); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxTakeUntilOther.java b/reactor-core/src/main/java/reactor/core/publisher/FluxTakeUntilOther.java index 03b7b36c2b..9c272857b5 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxTakeUntilOther.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxTakeUntilOther.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -117,6 +117,7 @@ public void onComplete() { return; } once = true; + main.cancelMain(); main.onComplete(); } } @@ -219,14 +220,13 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (main == null) { if (MAIN.compareAndSet(this, null, Operators.cancelledSubscription())) { Operators.error(actual, t); return; } } - cancel(); + cancelOther(); actual.onError(t); } @@ -235,12 +235,11 @@ public void onError(Throwable t) { public void onComplete() { if (main == null) { if (MAIN.compareAndSet(this, null, Operators.cancelledSubscription())) { - cancelOther(); Operators.complete(actual); return; } } - cancel(); + cancelOther(); actual.onComplete(); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoCompletionStage.java b/reactor-core/src/main/java/reactor/core/publisher/MonoCompletionStage.java index 4dba71fdd3..b9d3953c9d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoCompletionStage.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoCompletionStage.java @@ -64,7 +64,7 @@ public void cancel() { return; } - future.whenComplete((v, e) -> { + future.handle((v, e) -> { if (sds.isCancelled()) { //nobody is interested in the Mono anymore, don't risk dropping errors Context ctx = sds.currentContext(); @@ -79,7 +79,7 @@ public void cancel() { Operators.onDiscard(v, ctx); } - return; + return null; } try { if (e instanceof CompletionException) { @@ -99,6 +99,7 @@ else if (v != null) { Operators.onErrorDropped(e1, actual.currentContext()); throw Exceptions.bubble(e1); } + return null; }); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/Sinks.java b/reactor-core/src/main/java/reactor/core/publisher/Sinks.java index 07efd2448b..e9183e4ba6 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Sinks.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Sinks.java @@ -269,7 +269,8 @@ public interface EmitFailureHandler { *

* As a result there will always be some delay between this computation and the actual first * use of the handler (at a minimum, the time it takes for the first sink emission attempt). - * Consider this when choosing the {@link Duration}, and probably prefer something above 100ms. + * Consider this when choosing the {@link Duration}, and probably prefer something above 100ms, + * and don't cache the returning handler for later usage. * * @param duration {@link Duration} for the deadline * @return an optimistic and bounded busy-looping {@link EmitFailureHandler} diff --git a/reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java b/reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java index dfa05b6da2..5b07f8c86b 100644 --- a/reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java +++ b/reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java @@ -250,14 +250,26 @@ public Mono disposeGracefully() { public Disposable schedule(Runnable task) { //tasks running once will call dispose on the BoundedState, decreasing its usage by one BoundedState picked = state.currentResource.pick(); - return Schedulers.directSchedule(picked.executor, task, picked, 0L, TimeUnit.MILLISECONDS); + try { + return Schedulers.directSchedule(picked.executor, task, picked, 0L, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ex) { + // ensure to free the BoundedState so it can be reused + picked.dispose(); + throw ex; + } } @Override public Disposable schedule(Runnable task, long delay, TimeUnit unit) { //tasks running once will call dispose on the BoundedState, decreasing its usage by one final BoundedState picked = state.currentResource.pick(); - return Schedulers.directSchedule(picked.executor, task, picked, delay, unit); + try { + return Schedulers.directSchedule(picked.executor, task, picked, delay, unit); + } catch (RejectedExecutionException ex) { + // ensure to free the BoundedState so it can be reused + picked.dispose(); + throw ex; + } } @Override @@ -266,14 +278,20 @@ public Disposable schedulePeriodically(Runnable task, long period, TimeUnit unit) { final BoundedState picked = state.currentResource.pick(); - Disposable scheduledTask = Schedulers.directSchedulePeriodically(picked.executor, - task, - initialDelay, - period, - unit); - //a composite with picked ensures the cancellation of the task releases the BoundedState - // (ie decreases its usage by one) - return Disposables.composite(scheduledTask, picked); + try { + Disposable scheduledTask = Schedulers.directSchedulePeriodically(picked.executor, + task, + initialDelay, + period, + unit); + //a composite with picked ensures the cancellation of the task releases the BoundedState + // (ie decreases its usage by one) + return Disposables.composite(scheduledTask, picked); + } catch (RejectedExecutionException ex) { + // ensure to free the BoundedState so it can be reused + picked.dispose(); + throw ex; + } } @Override diff --git a/reactor-core/src/main/java/reactor/core/scheduler/ReactorBlockHoundIntegration.java b/reactor-core/src/main/java/reactor/core/scheduler/ReactorBlockHoundIntegration.java index 2c253e2b2f..7f068445e7 100644 --- a/reactor-core/src/main/java/reactor/core/scheduler/ReactorBlockHoundIntegration.java +++ b/reactor-core/src/main/java/reactor/core/scheduler/ReactorBlockHoundIntegration.java @@ -42,6 +42,7 @@ public void applyTo(BlockHound.Builder builder) { // Calls ScheduledFutureTask#cancel that may short park in DelayedWorkQueue#remove for getting a lock builder.allowBlockingCallsInside(SchedulerTask.class.getName(), "dispose"); + builder.allowBlockingCallsInside(WorkerTask.class.getName(), "dispose"); builder.allowBlockingCallsInside(ThreadPoolExecutor.class.getName(), "processWorkerExit"); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxCacheTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxCacheTest.java index 2c0d24ad59..aa6d45afc2 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxCacheTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxCacheTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,7 +103,7 @@ public void cacheFluxHistoryTTL() { } @Test - public void cacheFluxTTL2() { + public void cacheFluxTTLReconnectsAfterTTL() { VirtualTimeScheduler vts = VirtualTimeScheduler.create(); AtomicInteger i = new AtomicInteger(0); @@ -125,6 +125,47 @@ public void cacheFluxTTL2() { .verifyComplete(); } + @Test + void cacheZeroFluxCachesCompletion() { + VirtualTimeScheduler vts = VirtualTimeScheduler.create(); + + Flux> source = Flux.just(1, 2, 3) + .delayElements(Duration.ofMillis(1000) + , vts) + .cache(0) + .elapsed(vts); + + StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE) + .thenAwait(Duration.ofSeconds(3)) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3) + .verifyComplete(); + + StepVerifier.create(source).verifyComplete(); + } + + @Test + public void cacheZeroFluxTTLReconnectsAfterSourceCompletion() { + VirtualTimeScheduler vts = VirtualTimeScheduler.create(); + + Flux> source = Flux.just(1, 2, 3) + .delayElements( + Duration.ofMillis(1000), vts + ) + .cache(0, Duration.ofMillis(2000), vts) + .elapsed(vts); + + StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE) + .thenAwait(Duration.ofSeconds(3)) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3) + .verifyComplete(); + + StepVerifier.create(source).expectTimeout(Duration.ofMillis(500)).verify(); + } + @Test public void cacheContextHistory() { AtomicInteger contextFillCount = new AtomicInteger(); @@ -156,6 +197,38 @@ public void cacheContextHistory() { assertThat(contextFillCount).as("cacheHit3").hasValue(4); } + @Test + public void cacheZeroContext() { + AtomicInteger contextFillCount = new AtomicInteger(); + Flux cached = Flux.just(1, 2) + .flatMap(i -> Mono.deferContextual(Mono::just) + .map(ctx -> ctx.getOrDefault("a", "BAD")) + ) + .cache(0) + .contextWrite(ctx -> ctx.put("a", "GOOD" + contextFillCount.incrementAndGet())); + + // at first pass, the Context is propagated to subscriber, but not cached + String cacheMiss = cached.blockLast(); + assertThat(cacheMiss).as("cacheMiss").isEqualTo("GOOD1"); + assertThat(contextFillCount).as("cacheMiss").hasValue(1); + + // at second subscribe, the Context fill attempt is still done, but ultimately + // ignored since source terminated + String zeroCache = cached.blockLast(); + assertThat(zeroCache).as("zeroCache").isNull(); //value from the cache + assertThat(contextFillCount).as("zeroCache").hasValue(2); //function was still invoked + + //at third subscribe, function is called for the 3rd time, but the context is still cached + String zeroCache2 = cached.blockLast(); + assertThat(zeroCache2).as("zeroCache2").isNull(); + assertThat(contextFillCount).as("zeroCache2").hasValue(3); + + //at fourth subscribe, function is called for the 4th time, but the context is still cached + String zeroCache3 = cached.blockLast(); + assertThat(zeroCache3).as("zeroCache3").isNull(); + assertThat(contextFillCount).as("zeroCache3").hasValue(4); + } + @Test public void cacheContextTime() { AtomicInteger contextFillCount = new AtomicInteger(); diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxPublishTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxPublishTest.java index e020930eff..17efb6dfb4 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxPublishTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxPublishTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,10 +26,12 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Subscription; import reactor.core.Disposable; import reactor.core.Scannable; import reactor.core.scheduler.Schedulers; +import reactor.test.ParameterizedTestWithName; import reactor.test.StepVerifier; import reactor.test.publisher.FluxOperatorTest; import reactor.test.publisher.TestPublisher; @@ -53,8 +55,9 @@ protected Scenario defaultScenarioOptions(Scenario> scenarios_operatorSuccess() { return Arrays.asList( scenario(f -> f.publish().autoConnect()), - - scenario(f -> f.publish().refCount()) + scenario(f -> f.publish().refCount()), + scenario(f -> f.replay(0).autoConnect()), + scenario(f -> f.replay(0).refCount()) ); } @@ -118,12 +121,15 @@ public void constructors() { ctb.test(); }*/ - @Test - public void normal() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + void normal(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(); AssertSubscriber ts2 = AssertSubscriber.create(); - ConnectableFlux p = Flux.range(1, 5).hide().publish(); + Flux source = Flux.range(1, 5).hide(); + ConnectableFlux p = replayTerminalSignal ? + source.replay(0) : source.publish(); p.subscribe(ts1); p.subscribe(ts2); @@ -147,14 +153,34 @@ public void normal() { ts2.assertValues(1, 2, 3, 4, 5) .assertNoError() .assertComplete(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + if (replayTerminalSignal) { + ts3.assertNoValues() + .assertNoError() + .assertComplete(); + } else { + ts3.assertNoEvents(); + + p.connect(); + + ts3.assertValues(1, 2, 3, 4, 5) + .assertNoError() + .assertComplete(); + } } - @Test - public void normalBackpressured() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + void normalBackpressured(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(0); AssertSubscriber ts2 = AssertSubscriber.create(0); - ConnectableFlux p = Flux.range(1, 5).hide().publish(); + Flux source = Flux.range(1, 5).hide(); + ConnectableFlux p = replayTerminalSignal ? + source.replay(0) : source.publish(); p.subscribe(ts1); p.subscribe(ts2); @@ -202,10 +228,28 @@ public void normalBackpressured() { ts2.assertValues(1, 2, 3, 4, 5) .assertNoError() .assertComplete(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + if (replayTerminalSignal) { + ts3.assertNoValues() + .assertNoError() + .assertComplete(); + } else { + ts3.assertNoEvents(); + + p.connect(); + + ts3.assertValues(1, 2, 3, 4, 5) + .assertNoError() + .assertComplete(); + } } - @Test - public void normalAsyncFused() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void normalAsyncFused(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(); AssertSubscriber ts2 = AssertSubscriber.create(); @@ -217,7 +261,8 @@ public void normalAsyncFused() { up.emitNext(5, FAIL_FAST); up.emitComplete(FAIL_FAST); - ConnectableFlux p = up.asFlux().publish(); + ConnectableFlux p = replayTerminalSignal ? + up.asFlux().replay(0) : up.asFlux().publish(); p.subscribe(ts1); p.subscribe(ts2); @@ -241,10 +286,20 @@ public void normalAsyncFused() { ts2.assertValues(1, 2, 3, 4, 5) .assertNoError() .assertComplete(); + + // late subscriber + if (replayTerminalSignal) { + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + ts3.assertNoValues() + .assertNoError() + .assertComplete(); + } // no else - unicast disallows second connect } - @Test - public void normalBackpressuredAsyncFused() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void normalBackpressuredAsyncFused(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(0); AssertSubscriber ts2 = AssertSubscriber.create(0); @@ -256,7 +311,8 @@ public void normalBackpressuredAsyncFused() { up.emitNext(5, FAIL_FAST); up.emitComplete(FAIL_FAST); - ConnectableFlux p = up.asFlux().publish(); + ConnectableFlux p = replayTerminalSignal ? + up.asFlux().replay(0) : up.asFlux().publish(); p.subscribe(ts1); p.subscribe(ts2); @@ -304,14 +360,26 @@ public void normalBackpressuredAsyncFused() { ts2.assertValues(1, 2, 3, 4, 5) .assertNoError() .assertComplete(); + + // late subscriber + if (replayTerminalSignal) { + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + ts3.assertNoValues() + .assertNoError() + .assertComplete(); + } // no else - unicast disallows second connect } - @Test - public void normalSyncFused() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void normalSyncFused(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(); AssertSubscriber ts2 = AssertSubscriber.create(); - ConnectableFlux p = Flux.range(1, 5).publish(5); + Flux source = Flux.range(1, 5); + ConnectableFlux p = replayTerminalSignal ? + source.replay(0) : source.publish(5); p.subscribe(ts1); p.subscribe(ts2); @@ -337,12 +405,15 @@ public void normalSyncFused() { .assertComplete(); } - @Test - public void normalBackpressuredSyncFused() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void normalBackpressuredSyncFused(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(0); AssertSubscriber ts2 = AssertSubscriber.create(0); - ConnectableFlux p = Flux.range(1, 5).publish(5); + Flux source = Flux.range(1, 5); + ConnectableFlux p = replayTerminalSignal ? + source.replay(0) : source.publish(5); p.subscribe(ts1); p.subscribe(ts2); @@ -390,6 +461,23 @@ public void normalBackpressuredSyncFused() { ts2.assertValues(1, 2, 3, 4, 5) .assertNoError() .assertComplete(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + if (replayTerminalSignal) { + ts3.assertNoValues() + .assertNoError() + .assertComplete(); + } else { + ts3.assertNoEvents(); + + p.connect(); + + ts3.assertValues(1, 2, 3, 4, 5) + .assertNoError() + .assertComplete(); + } } //see https://github.com/reactor/reactor-core/issues/1302 @@ -408,13 +496,15 @@ public void boundaryFused() { .verify(Duration.ofSeconds(5)); } - @Test - public void disconnect() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void disconnect(boolean replayTerminalSignal) { AssertSubscriber ts = AssertSubscriber.create(); Sinks.Many e = Sinks.many().multicast().onBackpressureBuffer(); - ConnectableFlux p = e.asFlux().publish(); + ConnectableFlux p = replayTerminalSignal ? + e.asFlux().replay(0) : e.asFlux().publish(); p.subscribe(ts); @@ -430,15 +520,22 @@ public void disconnect() { .assertNotComplete(); assertThat(e.currentSubscriberCount()).as("still connected").isZero(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + ts3.assertNoEvents(); } - @Test - public void disconnectBackpressured() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void disconnectBackpressured(boolean replayTerminalSignal) { AssertSubscriber ts = AssertSubscriber.create(0); Sinks.Many e = Sinks.many().multicast().onBackpressureBuffer(); - ConnectableFlux p = e.asFlux().publish(); + ConnectableFlux p = replayTerminalSignal ? + e.asFlux().replay(0) : e.asFlux().publish(); p.subscribe(ts); @@ -451,15 +548,22 @@ public void disconnectBackpressured() { .assertNotComplete(); assertThat(e.currentSubscriberCount()).as("still connected").isZero(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + ts3.assertNoEvents(); } - @Test - public void error() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void error(boolean replayTerminalSignal) { AssertSubscriber ts = AssertSubscriber.create(); Sinks.Many e = Sinks.many().multicast().onBackpressureBuffer(); - ConnectableFlux p = e.asFlux().publish(); + ConnectableFlux p = replayTerminalSignal ? + e.asFlux().replay(0) : e.asFlux().publish(); p.subscribe(ts); @@ -473,13 +577,28 @@ public void error() { .assertError(RuntimeException.class) .assertErrorWith(x -> assertThat(x).hasMessageContaining("forced failure")) .assertNotComplete(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + + if (replayTerminalSignal) { + ts3.assertError(RuntimeException.class) + .assertErrorWith(x -> assertThat(x).hasMessageContaining("forced failure")) + .assertNotComplete(); + } else { + ts3.assertNoEvents(); + } } - @Test - public void fusedMapInvalid() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void fusedMapInvalid(boolean replayTerminalSignal) { AssertSubscriber ts = AssertSubscriber.create(); - ConnectableFlux p = Flux.range(1, 5).map(v -> (Integer)null).publish(); + Flux source = Flux.range(1, 5).map(v -> (Integer) null); + ConnectableFlux p = replayTerminalSignal ? + source.replay(0) : source.publish(); p.subscribe(ts); @@ -488,15 +607,27 @@ public void fusedMapInvalid() { ts.assertNoValues() .assertError(NullPointerException.class) .assertNotComplete(); - } + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); - @Test - public void retry() { + if (replayTerminalSignal) { + ts3.assertError(NullPointerException.class) + .assertNotComplete(); + } else { + ts3.assertNoEvents(); + } + } + + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void retry(boolean replayTerminalSignal) { Sinks.Many dp = Sinks.unsafe().many().multicast().directBestEffort(); + ConnectableFlux flux = replayTerminalSignal ? + dp.asFlux().replay(0) : dp.asFlux().publish(); StepVerifier.create( - dp.asFlux() - .publish() + flux .autoConnect().handle((s1, sink) -> { if (s1 == 1) { sink.error(new RuntimeException()); @@ -518,12 +649,16 @@ public void retry() { dp.emitComplete(FAIL_FAST); } - @Test - public void retryWithPublishOn() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void retryWithPublishOn(boolean replayTerminalSignal) { Sinks.Many dp = Sinks.unsafe().many().multicast().directBestEffort(); + Flux source = dp.asFlux().publishOn(Schedulers.parallel()); + + ConnectableFlux flux = replayTerminalSignal ? + source.replay(0) : source.publish(); StepVerifier.create( - dp.asFlux() - .publishOn(Schedulers.parallel()).publish() + flux .autoConnect().handle((s1, sink) -> { if (s1 == 1) { sink.error(new RuntimeException()); @@ -548,7 +683,8 @@ public void retryWithPublishOn() { @Test public void scanMain() { Flux parent = Flux.just(1).map(i -> i); - FluxPublish test = new FluxPublish<>(parent, 123, Queues.unbounded()); + FluxPublish test = + new FluxPublish<>(parent, 123, Queues.unbounded(), true); assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(123); @@ -557,7 +693,8 @@ public void scanMain() { @Test public void scanSubscriber() { - FluxPublish main = new FluxPublish<>(Flux.just(1), 123, Queues.unbounded()); + FluxPublish main = + new FluxPublish<>(Flux.just(1), 123, Queues.unbounded(), true); FluxPublish.PublishSubscriber test = new FluxPublish.PublishSubscriber<>(789, main); Subscription parent = Operators.emptySubscription(); test.onSubscribe(parent); @@ -583,7 +720,8 @@ public void scanSubscriber() { @Test public void scanInner() { - FluxPublish main = new FluxPublish<>(Flux.just(1), 123, Queues.unbounded()); + FluxPublish main = + new FluxPublish<>(Flux.just(1), 123, Queues.unbounded(), true); FluxPublish.PublishSubscriber parent = new FluxPublish.PublishSubscriber<>(789, main); Subscription sub = Operators.emptySubscription(); parent.onSubscribe(sub); @@ -607,7 +745,8 @@ public void scanInner() { @Test public void scanPubSubInner() { - FluxPublish main = new FluxPublish<>(Flux.just(1), 123, Queues.unbounded()); + FluxPublish main = + new FluxPublish<>(Flux.just(1), 123, Queues.unbounded(), true); FluxPublish.PublishSubscriber parent = new FluxPublish.PublishSubscriber<>(789, main); Subscription sub = Operators.emptySubscription(); parent.onSubscribe(sub); @@ -624,10 +763,11 @@ public void scanPubSubInner() { } //see https://github.com/reactor/reactor-core/issues/1290 - @Test - public void syncFusionSingle() { //single value in the SYNC fusion - final ConnectableFlux publish = Flux.just("foo") - .publish(); + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void syncFusionSingle(boolean replayTerminalSignal) { //single value in the SYNC fusion + final ConnectableFlux publish = replayTerminalSignal ? + Flux.just("foo").replay(0) : Flux.just("foo").publish(); StepVerifier.create(publish) .then(publish::connect) @@ -637,10 +777,12 @@ public void syncFusionSingle() { //single value in the SYNC fusion } //see https://github.com/reactor/reactor-core/issues/1290 - @Test - public void syncFusionMultiple() { //multiple values in the SYNC fusion - final ConnectableFlux publish = Flux.range(1, 5) - .publish(); + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void syncFusionMultiple(boolean replayTerminalSignal) { //multiple values in the SYNC fusion + final ConnectableFlux publish = replayTerminalSignal ? + Flux.range(1, 5).replay(0) : + Flux.range(1, 5).publish(); StepVerifier.create(publish) .then(publish::connect) @@ -650,12 +792,13 @@ public void syncFusionMultiple() { //multiple values in the SYNC fusion } //see https://github.com/reactor/reactor-core/issues/1528 - @Test + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) @Timeout(4) - public void syncFusionFromInfiniteStream() { - final ConnectableFlux publish = - Flux.fromStream(Stream.iterate(0, i -> i + 1)) - .publish(); + public void syncFusionFromInfiniteStream(boolean replayTerminalSignal) { + Flux source = Flux.fromStream(Stream.iterate(0, i -> i + 1)); + final ConnectableFlux publish = replayTerminalSignal ? + source.replay(0) : source.publish(); StepVerifier.create(publish) .then(publish::connect) @@ -666,12 +809,15 @@ public void syncFusionFromInfiniteStream() { } //see https://github.com/reactor/reactor-core/issues/1528 - @Test + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) @Timeout(4) - public void syncFusionFromInfiniteStreamAndTake() { + public void syncFusionFromInfiniteStreamAndTake(boolean replayTerminalSignal) { + Flux source = Flux.fromStream(Stream.iterate(0, i -> i + 1)); + ConnectableFlux flux = replayTerminalSignal ? + source.replay(0) : source.publish(); final Flux publish = - Flux.fromStream(Stream.iterate(0, i -> i + 1)) - .publish() + flux .autoConnect() .take(10); @@ -681,10 +827,12 @@ public void syncFusionFromInfiniteStreamAndTake() { .verify(Duration.ofSeconds(4)); } - @Test - public void dataDroppedIfConnectImmediately() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void dataDroppedIfConnectImmediately(boolean replayTerminalSignal) { TestPublisher publisher = TestPublisher.create(); - ConnectableFlux connectableFlux = publisher.flux().publish(); + ConnectableFlux connectableFlux = replayTerminalSignal ? + publisher.flux().replay(0) : publisher.flux().publish(); connectableFlux.connect(); @@ -700,10 +848,13 @@ public void dataDroppedIfConnectImmediately() { .verifyComplete(); } - @Test - public void dataDroppedIfAutoconnectZero() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void dataDroppedIfAutoconnectZero(boolean replayTerminalSignal) { TestPublisher publisher = TestPublisher.create(); - Flux flux = publisher.flux().publish().autoConnect(0); + ConnectableFlux publish = replayTerminalSignal ? + publisher.flux().replay(0) : publisher.flux().publish(); + Flux flux = publish.autoConnect(0); publisher.next(1); publisher.next(2); @@ -733,21 +884,22 @@ public void removeUnknownInnerIgnored() { assertThat(subscriber.subscribers).as("post remove inner").isEmpty(); } - @Test - public void subscriberContextPropagation() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void subscriberContextPropagation(boolean replayTerminalSignal) { String key = "key"; int expectedValue = 1; AtomicReference reference = new AtomicReference<>(); + Flux source = Flux.just(1, 2, 3) + .flatMap(value -> Mono.deferContextual(Mono::just) + .doOnNext(reference::set) + .thenReturn(value)); + ConnectableFlux publish = replayTerminalSignal ? + source.replay(0) : source.publish(); Flux integerFlux = - Flux.just(1, 2, 3) - .flatMap(value -> - Mono.deferContextual(Mono::just) - .doOnNext(reference::set) - .thenReturn(value) - ) - .publish() + publish .autoConnect(2); integerFlux.contextWrite(Context.of(key, expectedValue)) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxTakeUntilOtherTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxTakeUntilOtherTest.java index 21eea3c17e..1c331b7580 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxTakeUntilOtherTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxTakeUntilOtherTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; @@ -27,7 +28,6 @@ import reactor.test.subscriber.AssertSubscriber; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.junit.jupiter.api.Assertions.assertThrows; public class FluxTakeUntilOtherTest { @@ -49,14 +49,19 @@ public void nullOther() { @Test public void takeAll() { AssertSubscriber ts = AssertSubscriber.create(); + AtomicBoolean mainCancelled = new AtomicBoolean(false); + AtomicBoolean otherCancelled = new AtomicBoolean(false); Flux.range(1, 10) - .takeUntilOther(Flux.never()) + .doOnCancel(() -> mainCancelled.set(true)) + .takeUntilOther(Flux.never().doOnCancel(() -> otherCancelled.set(true))) .subscribe(ts); ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .assertComplete() .assertNoError(); + Assertions.assertThat(mainCancelled).isFalse(); + Assertions.assertThat(otherCancelled).isTrue(); } @Test @@ -87,14 +92,22 @@ public void takeAllBackpressured() { @Test public void takeNone() { AssertSubscriber ts = AssertSubscriber.create(); + AtomicBoolean mainCancelled = new AtomicBoolean(false); + AtomicBoolean otherCancelled = new AtomicBoolean(false); + Flux other = + Flux.empty() + .doOnCancel(() -> otherCancelled.set(true)); Flux.range(1, 10) - .takeUntilOther(Flux.empty()) + .doOnCancel(() -> mainCancelled.set(true)) + .takeUntilOther(other) .subscribe(ts); ts.assertNoValues() .assertComplete() .assertNoError(); + Assertions.assertThat(mainCancelled).isTrue(); + Assertions.assertThat(otherCancelled).isTrue(); } @Test @@ -113,14 +126,23 @@ public void takeNoneBackpressured() { @Test public void takeNoneOtherMany() { AssertSubscriber ts = AssertSubscriber.create(); + AtomicBoolean mainCancelled = new AtomicBoolean(false); + AtomicBoolean otherCancelled = new AtomicBoolean(false); + Flux other = + Flux.range(1, 10) + .doOnCancel(() -> otherCancelled.set(true)); Flux.range(1, 10) - .takeUntilOther(Flux.range(1, 10)) + .doOnCancel(() -> mainCancelled.set(true)) + .takeUntilOther(other) .subscribe(ts); ts.assertNoValues() .assertComplete() .assertNoError(); + + Assertions.assertThat(mainCancelled).isTrue(); + Assertions.assertThat(otherCancelled).isTrue(); } @Test @@ -139,15 +161,24 @@ public void takeNoneBackpressuredOtherMany() { @Test public void otherSignalsError() { AssertSubscriber ts = AssertSubscriber.create(); + AtomicBoolean mainCancelled = new AtomicBoolean(false); + AtomicBoolean otherCancelled = new AtomicBoolean(false); + Flux other = + Flux.error(new RuntimeException("forced " + "failure")) + .doOnCancel(() -> otherCancelled.set(true)); Flux.range(1, 10) - .takeUntilOther(Flux.error(new RuntimeException("forced " + "failure"))) + .doOnCancel(() -> mainCancelled.set(true)) + .takeUntilOther(other) .subscribe(ts); ts.assertNoValues() .assertNotComplete() .assertError(RuntimeException.class) .assertErrorMessage("forced failure"); + + Assertions.assertThat(mainCancelled).isTrue(); + Assertions.assertThat(otherCancelled).isFalse(); } @Test @@ -164,6 +195,29 @@ public void otherSignalsErrorBackpressured() { .assertNotComplete(); } + @Test + public void mainSignalsError() { + AssertSubscriber ts = AssertSubscriber.create(); + AtomicBoolean mainCancelled = new AtomicBoolean(false); + AtomicBoolean otherCancelled = new AtomicBoolean(false); + + Flux other = + Flux.never() + .doOnCancel(() -> otherCancelled.set(true)); + Flux.error(new RuntimeException("forced " + "failure")) + .doOnCancel(() -> mainCancelled.set(true)) + .takeUntilOther(other) + .subscribe(ts); + + ts.assertNoValues() + .assertNotComplete() + .assertError(RuntimeException.class) + .assertErrorMessage("forced failure"); + + Assertions.assertThat(mainCancelled).isFalse(); + Assertions.assertThat(otherCancelled).isTrue(); + } + Flux scenario_aFluxCanBeLimitedByTime(){ return Flux.range(0, 1000) .take(Duration.ofSeconds(2)); @@ -240,7 +294,9 @@ public void scanOtherSubscriber() { Assertions.assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); Assertions.assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse(); + Assertions.assertThat(main.scan(Scannable.Attr.CANCELLED)).isFalse(); main.cancel(); Assertions.assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue(); + Assertions.assertThat(main.scan(Scannable.Attr.CANCELLED)).isTrue(); } } diff --git a/reactor-core/src/test/java/reactor/core/scheduler/BoundedElasticSchedulerTest.java b/reactor-core/src/test/java/reactor/core/scheduler/BoundedElasticSchedulerTest.java index debe29b14e..984adac6db 100644 --- a/reactor-core/src/test/java/reactor/core/scheduler/BoundedElasticSchedulerTest.java +++ b/reactor-core/src/test/java/reactor/core/scheduler/BoundedElasticSchedulerTest.java @@ -1551,6 +1551,89 @@ void toStringOfExecutorReflectsIdleVsActive() throws InterruptedException { } } + @Test + public void transitionsToIdleAfterRejectionAndFollowingCompletion() throws InterruptedException { + int taskRejected = 0; + + int maxThreads = 2; + int maxTaskQueuedPerThread = 10; + + BoundedElasticScheduler scheduler = afterTest.autoDispose( + new BoundedElasticScheduler( + maxThreads, + maxTaskQueuedPerThread, + new ReactorThreadFactory( + "moveToIdleAfterHaveRejectTasksAndCompleteAllTasks", + new AtomicLong(), + false, + false, + null), + 60)); + scheduler.start(); + + CountDownLatch releaseAllTasksLatch = new CountDownLatch(1); + CountDownLatch tasksStartedLatch = new CountDownLatch(maxThreads); + + Runnable startedAndWaitLatchRunnable = () -> { + try { + tasksStartedLatch.countDown(); + releaseAllTasksLatch.await(30, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + }; + + Runnable waitLatchRunnable = () -> { + try { + releaseAllTasksLatch.await(30, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + }; + + // initial tasks that block the threads, causing other tasks to enter the pending queue + for (int i = 0; i < maxThreads; i++) { + scheduler.schedule(startedAndWaitLatchRunnable); + } + + // small window to start the first task + assertThat(tasksStartedLatch.await(1, TimeUnit.SECONDS)).as("task picked").isTrue(); + + // fill up pending queue + for (int i = 0; i < maxThreads * maxTaskQueuedPerThread; i++) { + scheduler.schedule(waitLatchRunnable); + } + + // check new tasks are rejected + assertThatExceptionOfType(RejectedExecutionException.class) + .as("must reject for method schedule without delay") + .isThrownBy(() -> scheduler.schedule(waitLatchRunnable)); + taskRejected++; + + assertThatExceptionOfType(RejectedExecutionException.class) + .as("must reject for method schedule with delay") + .isThrownBy(() -> scheduler.schedule(waitLatchRunnable, 100, TimeUnit.MILLISECONDS)); + taskRejected++; + + assertThatExceptionOfType(RejectedExecutionException.class) + .as("must reject for method schedulePeriodically") + .isThrownBy(() -> scheduler.schedulePeriodically(waitLatchRunnable, 100, 100, TimeUnit.MILLISECONDS)); + taskRejected++; + + assertThat(taskRejected).as("task rejected").isEqualTo(3); + + releaseAllTasksLatch.countDown(); + + Awaitility.with().pollInterval(50, TimeUnit.MILLISECONDS).pollDelay(50, TimeUnit.MILLISECONDS) + .await().atMost(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> assertThat(scheduler.estimateIdle()) + .as("all BoundedStates are idle after all pending tasks finish") + .isEqualTo(maxThreads) + ); + } + private static boolean canSubmitTask(Scheduler scheduler) { CountDownLatch latch = new CountDownLatch(1); scheduler.schedule(latch::countDown); diff --git a/reactor-test/build.gradle b/reactor-test/build.gradle index 0ee9ec6bbf..a84f624ee4 100644 --- a/reactor-test/build.gradle +++ b/reactor-test/build.gradle @@ -64,28 +64,55 @@ task downloadBaseline(type: Download) { finalizedBy { japicmp } } +def japicmpReport = tasks.register('japicmpReport') { + onlyIf { + japicmp.state.failure != null + } + doLast { + def reportFile = file("${project.buildDir}/reports/japi.txt") + if (reportFile.exists()) { + println "\n *********************************" + println " * /!\\ API compatibility failures *" + println " **********************************" + println "Japicmp report was filtered and interpreted to find the following incompatibilities:" + reportFile.eachLine { + if (it.contains("*") && (!it.contains("***") || it.contains("****"))) + println "source incompatible change: $it" + else if (it.contains("!")) + println "binary incompatible change: $it" + } + } + else println "No incompatible change to report" + } +} + task japicmp(type: JapicmpTask) { if (project.gradle.startParameter.isOffline()) { println "Offline: skipping downloading of baseline and JAPICMP" - enabled = false + enabled = false } else if ("${libs.versions.baseline.core.api.get()}" == "SKIP") { println "SKIP: Instructed to skip the baseline comparison" - enabled = false + enabled = false } else { println "Will download and perform baseline comparison with ${libs.versions.baseline.core.api.get()}" - dependsOn(downloadBaseline) + dependsOn(downloadBaseline) + finalizedBy(japicmpReport) } oldClasspath.from(files("${buildDir}/baselineLibs/reactor-test-${libs.versions.baseline.core.api.get()}.jar")) newClasspath.from(files(jar.archiveFile)) - onlyBinaryIncompatibleModified = true + // these onlyXxx parameters result in a report that is slightly too noisy, but better than + // onlyBinaryIncompatibleModified = true which masks source-incompatible-only changes + onlyBinaryIncompatibleModified = false + onlyModified = true failOnModification = true failOnSourceIncompatibility = true txtOutputFile = file("${project.buildDir}/reports/japi.txt") ignoreMissingClasses = true includeSynthetic = true + compatibilityChangeExcludes = [ "METHOD_NEW_DEFAULT" ] // TODO after a .0 release, remove the reactor-test exclusions below if any methodExcludes = [ ] diff --git a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java index 59defd30bf..3c846b7480 100644 --- a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java +++ b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java @@ -1449,7 +1449,8 @@ final void onExpectation(Signal actualSignal) { return; } //possibly re-evaluate the current onNext - event = this.script.peek(); + onExpectation(actualSignal); + return; } if (event instanceof CollectEvent) { diff --git a/reactor-test/src/test/java/reactor/test/DefaultStepVerifierBuilderTests.java b/reactor-test/src/test/java/reactor/test/DefaultStepVerifierBuilderTests.java index 3fe6238a39..2ed2aadd00 100644 --- a/reactor-test/src/test/java/reactor/test/DefaultStepVerifierBuilderTests.java +++ b/reactor-test/src/test/java/reactor/test/DefaultStepVerifierBuilderTests.java @@ -17,11 +17,13 @@ package reactor.test; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Queue; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -46,6 +48,91 @@ */ public class DefaultStepVerifierBuilderTests { + @Test + void consumeWhileFollowedByTaskEvent() { + List consumed = new ArrayList<>(); + + Flux.range(1, 20) + .as(StepVerifier::create) + .thenConsumeWhile(i -> i < 18, consumed::add) + .then(() -> consumed.add(100)) + .expectNext(18, 19, 20) + .verifyComplete(); + + assertThat(consumed) + .containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 11, 12, 13, 14, 15, 16, 17, 100); + } + + @Test + void thenConsumeWhile_NoElementsShouldBeConsumed_oneElement() { + Flux given = Flux.just("a"); + + StepVerifier.create(given) + .thenConsumeWhile(s -> s.equals("42")) + .then(() -> {}) // do nothing + .consumeNextWith(s -> Assertions.assertEquals("a", s)) + .verifyComplete(); + + // expectation "consumeNextWith" failed (expected: onNext(); actual: onComplete()) + } + + @Test + void thenConsumeWhile_NoElementsShouldBeConsumed_consumeWhileTwice() { + Flux given = Flux.just("a", "a"); + + StepVerifier.create(given) + .thenConsumeWhile(s -> s.equals("42")) + .then(() -> {}) + .thenConsumeWhile(s -> s.equals("42")) + .then(() -> {}) + .consumeNextWith(s -> Assertions.assertEquals("a", s)) + .consumeNextWith(s -> Assertions.assertEquals("a", s)) + .verifyComplete(); + + // expectation "consumeNextWith" failed (expected: onNext(); actual: onComplete()) + } + + @Test + void thenConsumeWhile_NoElementsShouldBeConsumed_moreThanOneElement() { + Flux given = Flux.just("a", "b"); + + StepVerifier.create(given) + .thenConsumeWhile(s -> s.equals("42")) + .then(() -> {}) // do nothing + .consumeNextWith(s -> Assertions.assertEquals("a", s)) + .consumeNextWith(s -> Assertions.assertEquals("b", s)) + .verifyComplete(); + + // org.opentest4j.AssertionFailedError: expected: but was: + } + + @Test + void thenConsumeWhile_OneElementShouldBeConsumed() { + Flux given = Flux.just("a", "b"); + + StepVerifier.create(given) + .thenConsumeWhile(s -> s.equals("a")) + .then(() -> {}) // do nothing + .consumeNextWith(s -> Assertions.assertEquals("b", s)) + .verifyComplete(); + + // expectation "consumeNextWith" failed (expected: onNext(); actual: onComplete()) + } + + @Test + void thenConsumeWhile_onlyTwoElementsShouldBeConsumed() { + Flux given = Flux.just("a", "a", "b"); + + StepVerifier.create(given) + .thenConsumeWhile(s -> s.equals("a")) + .then(() -> {}) // do nothing + .consumeNextWith(s -> Assertions.assertEquals("b", s)) + .verifyComplete(); + + // expectation "consumeNextWith" failed (expected: onNext(); actual: onComplete()) + } + @Test public void subscribedTwice() { Flux flux = Flux.just("foo", "bar"); diff --git a/reactor-tools/src/buildPluginTest/resources/mock-gradle/build.gradle b/reactor-tools/src/buildPluginTest/resources/mock-gradle/build.gradle index 34db6f00c1..9d71829b9f 100644 --- a/reactor-tools/src/buildPluginTest/resources/mock-gradle/build.gradle +++ b/reactor-tools/src/buildPluginTest/resources/mock-gradle/build.gradle @@ -22,7 +22,7 @@ buildscript { //the plugin feature only works with the -original jar !! //otherwise implemented Plugin interface is the shaded one classpath files("@AGENT@") - //version 1.12.13 cannot be found on Gradle Plugin Repository, this syntax allows looking it up on MavenCentral + //version 1.12.17 cannot be found on Gradle Plugin Repository, this syntax allows looking it up on MavenCentral classpath 'net.bytebuddy:byte-buddy-gradle-plugin:@BYTE_BUDDY_VERSION@' } }