Skip to content

FlowableOnBackpressureLatest can signal backpressure issue with concurrent Subscription.request and Subscriber.onNext #7864

Open
@wburns

Description

@wburns

I have been having very rare issues where an entry is signaled as overflowing back pressure, but when I checked all of my downstream Publishers they all have proper back pressure requests.

It seems that if you have a Subscription.request call inside BackpressureLatestSubscriber it will "obtain" the drain loop exclusive AtomicInteger. Then if a concurrent Subscriber.onNext call is done in BackpressureLatestSubscriberit will set the current AtomicReference and then try to enter the drain loop but immediately exits and then when it comes a second time it will see current variable is set and say it is a back pressure issue. The BackpressureLatestSubscriber should verify the requested count to determine if there is a back pressure issue instead of just the single current variable being set or not.

Note this requires a downstream processor to complete "async" so that the subscribe is called on the thread that completed the publisher while another thread is adding onNext call.

processorDisposer = requestProcessor.onBackpressureLatest(supplier -> {
                // Sometimes this is invoked despite rebatchRequests having requested more values
            })
            .rebatchRequests(indexerConfig.rebatchRequestsSize())
            // This is completed asynchronously from the invoking thread
            .flatMap(Supplier::get, indexerConfig.maxConcurrency())
            .doFinally(submittedTasks::clear)
            .subscribe(Functions.emptyConsumer(),
                  t -> log.fatal("Indexing encountered a non recoverable error", t));

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions