diff --git a/common/kotlinx-coroutines-core-common/src/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/Dispatched.kt index 32ee51f0a2..e8bd6581d9 100644 --- a/common/kotlinx-coroutines-core-common/src/Dispatched.kt +++ b/common/kotlinx-coroutines-core-common/src/Dispatched.kt @@ -21,26 +21,33 @@ internal object UndispatchedEventLoop { @JvmField internal val threadLocalEventLoop = CommonThreadLocal { EventLoop() } - inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, block: () -> Unit) { + inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, doYield: Boolean = false, block: () -> Unit) : Boolean { val eventLoop = threadLocalEventLoop.get() if (eventLoop.isActive) { + // If we are yielding and queue is empty, yield should be a no-op + if (doYield && eventLoop.queue.isEmpty) { + return false + } + continuation._state = contState continuation.resumeMode = mode eventLoop.queue.addLast(continuation) - return + return true } runEventLoop(eventLoop, block) + return false } - fun resumeUndispatched(task: DispatchedTask<*>) { + fun resumeUndispatched(task: DispatchedTask<*>): Boolean { val eventLoop = threadLocalEventLoop.get() if (eventLoop.isActive) { eventLoop.queue.addLast(task) - return + return true } runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED) }) + return false } inline fun runEventLoop(eventLoop: EventLoop, block: () -> Unit) { @@ -227,6 +234,12 @@ internal interface DispatchedTask : Runnable { } } +internal fun DispatchedContinuation.yield(): Boolean { + return UndispatchedEventLoop.execute(this, Unit, MODE_CANCELLABLE, true) { + run() + } +} + internal fun DispatchedTask.dispatch(mode: Int = MODE_CANCELLABLE) { val delegate = this.delegate if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) { diff --git a/common/kotlinx-coroutines-core-common/src/Supervisor.kt b/common/kotlinx-coroutines-core-common/src/Supervisor.kt index c222f667ac..f150737952 100644 --- a/common/kotlinx-coroutines-core-common/src/Supervisor.kt +++ b/common/kotlinx-coroutines-core-common/src/Supervisor.kt @@ -23,7 +23,7 @@ import kotlin.jvm.* * * If [parent] job is specified, then this supervisor job becomes a child job of its parent and is cancelled when its * parent fails or is cancelled. All this supervisor's children are cancelled in this case, too. The invocation of - * of [cancel][Job.cancel] with exception (other than [CancellationException]) on this supervisor job also cancels parent. + * [cancel][Job.cancel] with exception (other than [CancellationException]) on this supervisor job also cancels parent. * * @param parent an optional parent job. */ diff --git a/common/kotlinx-coroutines-core-common/src/Yield.kt b/common/kotlinx-coroutines-core-common/src/Yield.kt index 632dcba0b0..fde3391919 100644 --- a/common/kotlinx-coroutines-core-common/src/Yield.kt +++ b/common/kotlinx-coroutines-core-common/src/Yield.kt @@ -19,7 +19,9 @@ public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { u val context = uCont.context context.checkCompletion() val cont = uCont.intercepted() as? DispatchedContinuation ?: return@sc Unit - if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit + if (!cont.dispatcher.isDispatchNeeded(context)) { + return@sc if (cont.yield()) COROUTINE_SUSPENDED else Unit + } cont.dispatchYield(Unit) COROUTINE_SUSPENDED } diff --git a/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt b/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt index a6bf8f6180..5cfb8e8df1 100644 --- a/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt +++ b/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt @@ -8,6 +8,7 @@ internal class ArrayQueue { private var elements = arrayOfNulls(16) private var head = 0 private var tail = 0 + val isEmpty: Boolean get() = head == tail public fun addLast(element: T) { elements[tail] = element diff --git a/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt b/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt index 8866057a09..f37c35657c 100644 --- a/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt +++ b/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt @@ -54,7 +54,7 @@ class UnconfinedTest : TestBase() { } @Test - fun enterMultipleTimes() = runTest { + fun testEnterMultipleTimes() = runTest { launch(Unconfined) { expect(1) } @@ -70,5 +70,46 @@ class UnconfinedTest : TestBase() { finish(4) } + @Test + fun testYield() = runTest { + expect(1) + launch(Dispatchers.Unconfined) { + expect(2) + yield() + launch { + expect(4) + } + expect(3) + yield() + expect(5) + }.join() + + finish(6) + } + + @Test + fun testCancellationWihYields() = runTest { + expect(1) + GlobalScope.launch(Dispatchers.Unconfined) { + val job = coroutineContext[Job]!! + expect(2) + yield() + GlobalScope.launch(Dispatchers.Unconfined) { + expect(4) + job.cancel() + expect(5) + } + expect(3) + + try { + yield() + } finally { + expect(6) + } + } + + finish(7) + } + class TestException : Throwable() } diff --git a/docs/shared-mutable-state-and-concurrency.md b/docs/shared-mutable-state-and-concurrency.md index 4a44d3c666..7530861603 100644 --- a/docs/shared-mutable-state-and-concurrency.md +++ b/docs/shared-mutable-state-and-concurrency.md @@ -42,7 +42,7 @@ but others are unique. Let us launch a hundred coroutines all doing the same action thousand times. We'll also measure their completion time for further comparisons: -
+
```kotlin suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {