Skip to content

Improve handling of DispatchException in an undispatched case #4272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 37 additions & 25 deletions kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,55 +38,67 @@ internal fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, c
*
* It starts the coroutine using [startCoroutineUninterceptedOrReturn].
*/
internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? {
return undispatchedResult({ true }) {
block.startCoroutineUninterceptedOrReturn(receiver, this)
}
}
internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(
receiver: R, block: suspend R.() -> T
): Any? = startUndspatched(alwaysRethrow = true, receiver, block)

/**
* Same as [startUndispatchedOrReturn], but ignores [TimeoutCancellationException] on fast-path.
*/
internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturnIgnoreTimeout(
receiver: R, block: suspend R.() -> T
): Any? {
return undispatchedResult({ e -> !(e is TimeoutCancellationException && e.coroutine === this) }) {
block.startCoroutineUninterceptedOrReturn(receiver, this)
}
}
): Any? = startUndspatched(alwaysRethrow = false, receiver, block)

private inline fun <T> ScopeCoroutine<T>.undispatchedResult(
shouldThrow: (Throwable) -> Boolean,
startBlock: () -> Any?
/**
* Starts and handles the result of an undispatched coroutine, potentially with children.
* For example, it handles `coroutineScope { ...suspend of throw, maybe start children... }`
* and `launch(start = UNDISPATCHED) { ... }`
*
* @param alwaysRethrow specifies whether an exception should be unconditioanlly rethrown.
* It is a tweak for 'withTimeout' in order to successfully return values when the block was cancelled:
* i.e. `withTimeout(1ms) { Thread.sleep(1000); 42 }` should not fail.
*/
private fun <T, R> ScopeCoroutine<T>.startUndspatched(
alwaysRethrow: Boolean,
receiver: R, block: suspend R.() -> T
): Any? {
val result = try {
startBlock()
block.startCoroutineUninterceptedOrReturn(receiver, this)
} catch (e: DispatchException) {
// Special codepath for failing CoroutineDispatcher: rethrow an exception
// immediately without waiting for children to indicate something is wrong
dispatchExceptionAndMakeCompleting(e)
} catch (e: Throwable) {
CompletedExceptionally(e)
}

/*
* We're trying to complete our undispatched block here and have three code-paths:
* (1) Coroutine is suspended.
* Otherwise, coroutine had returned result, so we are completing our block (and its job).
* (2) If we can't complete it or started waiting for children, we suspend.
* (3) If we have successfully completed the coroutine state machine here,
* then we take the actual final state of the coroutine from makeCompletingOnce and return it.
*
* shouldThrow parameter is a special code path for timeout coroutine:
* If timeout is exceeded, but withTimeout() block was not suspended, we would like to return block value,
* not a timeout exception.
* We are trying to complete our undispatched block with the following possible codepaths:
* 1) The coroutine just suspended. I.e. `coroutineScope { .. suspend here }`.
* Then just suspend
* 2) The coroutine completed with something, but has active children. Wait for them, also suspend
* 3) The coroutine succesfully completed. Return or rethrow its result.
*/
if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1)
val state = makeCompletingOnce(result)
if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED // (2)
afterCompletionUndispatched()
return if (state is CompletedExceptionally) { // (3)
when {
shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont)
alwaysRethrow || notOwnTimeout(state.cause) -> throw recoverStackTrace(state.cause, uCont)
result is CompletedExceptionally -> throw recoverStackTrace(result.cause, uCont)
else -> result
}
} else {
state.unboxState()
}
}

private fun ScopeCoroutine<*>.notOwnTimeout(cause: Throwable): Boolean {
return cause !is TimeoutCancellationException || cause.coroutine !== this
}

private fun ScopeCoroutine<*>.dispatchExceptionAndMakeCompleting(e: DispatchException): Nothing {
makeCompleting(CompletedExceptionally(e.cause))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not makeCompletingOnce, to highlight the aspects of this codepath that are different from the happy path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like this path to be as robust as possible. There might be obscure reasons for DispatchException, for example a dispatcher sent coroutine to execute and then failed in the post-send code-path.
makeCompletingOnce can throw, and I would like to avoid extra try-catches here to preserve the contextual information (i.e. the suppressed dispatch exception)

throw recoverStackTrace(e.cause, uCont)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package kotlinx.coroutines
import kotlinx.coroutines.testing.*
import kotlin.test.*

class ExperimentalDispatchModeTest : TestBase() {
class UnconfinedCancellationTest : TestBase() {
@Test
fun testUnconfinedCancellation() = runTest {
val parent = Job()
Expand Down
26 changes: 26 additions & 0 deletions kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ package kotlinx.coroutines

import kotlinx.coroutines.testing.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import org.junit.*
import org.junit.Test
import org.junit.rules.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.startCoroutine
import kotlin.test.*

class FailFastOnStartTest : TestBase() {
Expand Down Expand Up @@ -81,4 +87,24 @@ class FailFastOnStartTest : TestBase() {
fun testAsyncNonChild() = runTest(expected = ::mainException) {
async<Int>(Job() + Dispatchers.Main) { fail() }
}

@Test
fun testFlowOn() {
// See #4142, this test ensures that `coroutineScope { produce(failingDispatcher, ATOMIC) }`
// rethrows an exception. It does not help with the completion of such a coroutine though.
// `suspend {}` + start coroutine with custom `completion` to avoid waiting for test completion
expect(1)
val caller = suspend {
try {
emptyFlow<Int>().flowOn(Dispatchers.Main).collect { fail() }
} catch (e: Throwable) {
assertTrue(mainException(e))
expect(2)
}
}

caller.startCoroutine(Continuation(EmptyCoroutineContext) {
finish(3)
})
}
}