Skip to content

Calling awaitOne on nonconforming Publishers gives nondescriptive errors  #2079

Closed
@vemilyus

Description

@vemilyus

Hi, I've noticed that Publisher.awaitOne allows further calls to onComplete after receiving an onError call. This can potentially hide the exception that was supplied to onError and instead throws this exception:

java.lang.IllegalStateException: Already resumed, but proposed with update null
	at kotlinx.coroutines.CancellableContinuationImpl.alreadyResumedError(CancellableContinuationImpl.kt:335)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:330)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:250)
	at kotlinx.coroutines.reactive.AwaitKt$awaitOne$$inlined$suspendCancellableCoroutine$lambda$1.onComplete(Await.kt:138)
	at org.jooq.impl.AbstractResultQuery$1.doComplete(AbstractResultQuery.java:415)
	at org.jooq.impl.AbstractResultQuery$1.request(AbstractResultQuery.java:409)
	at kotlinx.coroutines.reactive.AwaitKt$awaitOne$$inlined$suspendCancellableCoroutine$lambda$1.onSubscribe(Await.kt:105)
	at org.jooq.impl.AbstractResultQuery.subscribe(AbstractResultQuery.java:381)
	at org.jooq.impl.SelectImpl.subscribe(SelectImpl.java:2716)
	at kotlinx.coroutines.reactive.AwaitKt.awaitOne(Await.kt:97)
	at kotlinx.coroutines.reactive.AwaitKt.awaitOne$default(Await.kt:95)
	at kotlinx.coroutines.reactive.AwaitKt.awaitFirstOrNull(Await.kt:46)
	[REDACTED]
	at kotlinx.coroutines.stream.StreamFlow.collect(Stream.kt:27)
	at kotlinx.coroutines.stream.StreamFlow$collect$1.invokeSuspend(Stream.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)

The problem is that the call to onError doesn't mark the subscriber as done, so that no further calls to onComplete are accepted and processed, as is specified here (reactive-streams specification)

override fun onComplete() {
if (seenValue) {
if (cont.isActive) cont.resume(value as T)
return
}
when {
mode == Mode.FIRST_OR_DEFAULT -> {
cont.resume(default as T)
}
cont.isActive -> {
cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
}
}
}
override fun onError(e: Throwable) {
cont.resumeWithException(e)
}

This is something that I noticed when using JOOQ and Publisher.awaitFirstOrNull() to get a result from the database. JOOQ calls onComplete at all times, even if it called onError before. I've also mentioned that in an issue for JOOQ: jOOQ/jOOQ#10245

An easy fix for this would be to set seenValue = true in onError or add an extra field which is then checked in onComplete.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions