Skip to content

WindowTimeout with small timeout can race and reuse same window #912

Closed
@artshell

Description

Reactor Core version 3.1.0.RELEASE

This results in a UnicastProcessor allows only a single Subscriber message.

java -version 1.8

    public static void main(String[] args) {
        Flux.concat(
                Flux.just("#").delayElements(Duration.ofMillis(20)),
                Flux.range(1, 10),
                Flux.range(11, 5).delayElements(Duration.ofMillis(15))
        )
                .windowTimeout(10, Duration.ofMillis(1))
                .subscribe(flx -> flx.subscribe(System.out::println));
    }

print result

[ERROR] (parallel-1) Scheduler worker in group main failed with an uncaught exception - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber
Caused by: java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber
	at reactor.core.publisher.UnicastProcessor.subscribe(UnicastProcessor.java:380)
	at reactor.core.publisher.FluxWindowTimeOrSize$Window.subscribe(FluxWindowTimeOrSize.java:98)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6447)
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6440)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6404)
	at reactor.core.publisher.Flux.subscribe(Flux.java:6347)
	at com.artshell.reactor.operators.FluxWindowTimeout.lambda$main$0(FluxWindowTimeout.java:17)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
	at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:89)
	at reactor.core.publisher.FluxWindowTimeOrSize$WindowTimeoutSubscriber.windowCloseByTimeout(FluxWindowTimeOrSize.java:215)
	at reactor.core.publisher.FluxWindowTimeOrSize$WindowTimeoutSubscriber.lambda$new$0(FluxWindowTimeOrSize.java:178)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:71)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:39)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions