Skip to content

Commit

Permalink
Implement yield for unconfined dispatchers, documentation improvements
Browse files Browse the repository at this point in the history
Fixes #737
  • Loading branch information
robertgol authored and qwwdfsad committed Oct 24, 2018
1 parent 6f01c93 commit c33ef61
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 8 deletions.
21 changes: 17 additions & 4 deletions common/kotlinx-coroutines-core-common/src/Dispatched.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,33 @@ internal object UndispatchedEventLoop {
@JvmField
internal val threadLocalEventLoop = CommonThreadLocal { EventLoop() }

inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, block: () -> Unit) {
inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, doYield: Boolean = false, block: () -> Unit) : Boolean {
val eventLoop = threadLocalEventLoop.get()
if (eventLoop.isActive) {
// If we are yielding and queue is empty, yield should be a no-op
if (doYield && eventLoop.queue.isEmpty) {
return false
}

continuation._state = contState
continuation.resumeMode = mode
eventLoop.queue.addLast(continuation)
return
return true
}

runEventLoop(eventLoop, block)
return false
}

fun resumeUndispatched(task: DispatchedTask<*>) {
fun resumeUndispatched(task: DispatchedTask<*>): Boolean {
val eventLoop = threadLocalEventLoop.get()
if (eventLoop.isActive) {
eventLoop.queue.addLast(task)
return
return true
}

runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED) })
return false
}

inline fun runEventLoop(eventLoop: EventLoop, block: () -> Unit) {
Expand Down Expand Up @@ -227,6 +234,12 @@ internal interface DispatchedTask<in T> : Runnable {
}
}

internal fun DispatchedContinuation<Unit>.yield(): Boolean {
return UndispatchedEventLoop.execute(this, Unit, MODE_CANCELLABLE, true) {
run()
}
}

internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
val delegate = this.delegate
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
Expand Down
2 changes: 1 addition & 1 deletion common/kotlinx-coroutines-core-common/src/Supervisor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kotlin.jvm.*
*
* If [parent] job is specified, then this supervisor job becomes a child job of its parent and is cancelled when its
* parent fails or is cancelled. All this supervisor's children are cancelled in this case, too. The invocation of
* of [cancel][Job.cancel] with exception (other than [CancellationException]) on this supervisor job also cancels parent.
* [cancel][Job.cancel] with exception (other than [CancellationException]) on this supervisor job also cancels parent.
*
* @param parent an optional parent job.
*/
Expand Down
4 changes: 3 additions & 1 deletion common/kotlinx-coroutines-core-common/src/Yield.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { u
val context = uCont.context
context.checkCompletion()
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) {
return@sc if (cont.yield()) COROUTINE_SUSPENDED else Unit
}
cont.dispatchYield(Unit)
COROUTINE_SUSPENDED
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ internal class ArrayQueue<T : Any> {
private var elements = arrayOfNulls<Any>(16)
private var head = 0
private var tail = 0
val isEmpty: Boolean get() = head == tail

public fun addLast(element: T) {
elements[tail] = element
Expand Down
43 changes: 42 additions & 1 deletion common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class UnconfinedTest : TestBase() {
}

@Test
fun enterMultipleTimes() = runTest {
fun testEnterMultipleTimes() = runTest {
launch(Unconfined) {
expect(1)
}
Expand All @@ -70,5 +70,46 @@ class UnconfinedTest : TestBase() {
finish(4)
}

@Test
fun testYield() = runTest {
expect(1)
launch(Dispatchers.Unconfined) {
expect(2)
yield()
launch {
expect(4)
}
expect(3)
yield()
expect(5)
}.join()

finish(6)
}

@Test
fun testCancellationWihYields() = runTest {
expect(1)
GlobalScope.launch(Dispatchers.Unconfined) {
val job = coroutineContext[Job]!!
expect(2)
yield()
GlobalScope.launch(Dispatchers.Unconfined) {
expect(4)
job.cancel()
expect(5)
}
expect(3)

try {
yield()
} finally {
expect(6)
}
}

finish(7)
}

class TestException : Throwable()
}
2 changes: 1 addition & 1 deletion docs/shared-mutable-state-and-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ but others are unique.
Let us launch a hundred coroutines all doing the same action thousand times.
We'll also measure their completion time for further comparisons:

<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
<div class="sample" markdown="1" theme="idea" data-highlight-only>

```kotlin
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
Expand Down

0 comments on commit c33ef61

Please sign in to comment.