WindowTimeout with small timeout can race and reuse same window #912
Closed
Description
Reactor Core version 3.1.0.RELEASE
This results in a UnicastProcessor allows only a single Subscriber
message.
java -version 1.8
public static void main(String[] args) {
Flux.concat(
Flux.just("#").delayElements(Duration.ofMillis(20)),
Flux.range(1, 10),
Flux.range(11, 5).delayElements(Duration.ofMillis(15))
)
.windowTimeout(10, Duration.ofMillis(1))
.subscribe(flx -> flx.subscribe(System.out::println));
}
print result
[ERROR] (parallel-1) Scheduler worker in group main failed with an uncaught exception - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber
Caused by: java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber
at reactor.core.publisher.UnicastProcessor.subscribe(UnicastProcessor.java:380)
at reactor.core.publisher.FluxWindowTimeOrSize$Window.subscribe(FluxWindowTimeOrSize.java:98)
at reactor.core.publisher.Flux.subscribe(Flux.java:6447)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614)
at reactor.core.publisher.Flux.subscribe(Flux.java:6440)
at reactor.core.publisher.Flux.subscribe(Flux.java:6404)
at reactor.core.publisher.Flux.subscribe(Flux.java:6347)
at com.artshell.reactor.operators.FluxWindowTimeout.lambda$main$0(FluxWindowTimeout.java:17)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:89)
at reactor.core.publisher.FluxWindowTimeOrSize$WindowTimeoutSubscriber.windowCloseByTimeout(FluxWindowTimeOrSize.java:215)
at reactor.core.publisher.FluxWindowTimeOrSize$WindowTimeoutSubscriber.lambda$new$0(FluxWindowTimeOrSize.java:178)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:71)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)