Description
Hi, I've noticed that Publisher.awaitOne
allows further calls to onComplete
after receiving an onError
call. This can potentially hide the exception that was supplied to onError
and instead throws this exception:
java.lang.IllegalStateException: Already resumed, but proposed with update null
at kotlinx.coroutines.CancellableContinuationImpl.alreadyResumedError(CancellableContinuationImpl.kt:335)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:330)
at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:250)
at kotlinx.coroutines.reactive.AwaitKt$awaitOne$$inlined$suspendCancellableCoroutine$lambda$1.onComplete(Await.kt:138)
at org.jooq.impl.AbstractResultQuery$1.doComplete(AbstractResultQuery.java:415)
at org.jooq.impl.AbstractResultQuery$1.request(AbstractResultQuery.java:409)
at kotlinx.coroutines.reactive.AwaitKt$awaitOne$$inlined$suspendCancellableCoroutine$lambda$1.onSubscribe(Await.kt:105)
at org.jooq.impl.AbstractResultQuery.subscribe(AbstractResultQuery.java:381)
at org.jooq.impl.SelectImpl.subscribe(SelectImpl.java:2716)
at kotlinx.coroutines.reactive.AwaitKt.awaitOne(Await.kt:97)
at kotlinx.coroutines.reactive.AwaitKt.awaitOne$default(Await.kt:95)
at kotlinx.coroutines.reactive.AwaitKt.awaitFirstOrNull(Await.kt:46)
[REDACTED]
at kotlinx.coroutines.stream.StreamFlow.collect(Stream.kt:27)
at kotlinx.coroutines.stream.StreamFlow$collect$1.invokeSuspend(Stream.kt)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
The problem is that the call to onError
doesn't mark the subscriber as done, so that no further calls to onComplete
are accepted and processed, as is specified here (reactive-streams specification)
kotlinx.coroutines/reactive/kotlinx-coroutines-reactive/src/Await.kt
Lines 131 to 148 in 3bb3e55
This is something that I noticed when using JOOQ and Publisher.awaitFirstOrNull()
to get a result from the database. JOOQ calls onComplete
at all times, even if it called onError
before. I've also mentioned that in an issue for JOOQ: jOOQ/jOOQ#10245
An easy fix for this would be to set seenValue = true
in onError
or add an extra field which is then checked in onComplete
.