Description
I have been having very rare issues where an entry is signaled as overflowing back pressure, but when I checked all of my downstream Publishers they all have proper back pressure requests.
It seems that if you have a Subscription.request call inside BackpressureLatestSubscriber it will "obtain" the drain loop exclusive AtomicInteger. Then if a concurrent Subscriber.onNext call is done in BackpressureLatestSubscriberit will set the current
AtomicReference and then try to enter the drain loop but immediately exits and then when it comes a second time it will see current variable is set and say it is a back pressure issue. The BackpressureLatestSubscriber should verify the requested count to determine if there is a back pressure issue instead of just the single current
variable being set or not.
Note this requires a downstream processor to complete "async" so that the subscribe is called on the thread that completed the publisher while another thread is adding onNext call.
processorDisposer = requestProcessor.onBackpressureLatest(supplier -> {
// Sometimes this is invoked despite rebatchRequests having requested more values
})
.rebatchRequests(indexerConfig.rebatchRequestsSize())
// This is completed asynchronously from the invoking thread
.flatMap(Supplier::get, indexerConfig.maxConcurrency())
.doFinally(submittedTasks::clear)
.subscribe(Functions.emptyConsumer(),
t -> log.fatal("Indexing encountered a non recoverable error", t));