Skip to content

Commit

Permalink
Fix native-mt awaitAll
Browse files Browse the repository at this point in the history
Fixes #2025
  • Loading branch information
elizarov committed Jun 16, 2020
1 parent f404dce commit c272cb6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
27 changes: 17 additions & 10 deletions kotlinx-coroutines-core/common/src/Await.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
val deferred = deferreds[i]
deferred.start() // To properly await lazily started deferreds
AwaitAllNode(cont, deferred).apply {
handle = deferred.invokeOnCompletion(asHandler)
setHandle(deferred.invokeOnCompletion(asHandler))
}
}
val disposer = DisposeHandlersOnCancel(nodes)
// Step 2: Set disposer to each node
nodes.forEach { it.disposer = disposer }
nodes.forEach { it.setDisposer(disposer) }
// Here we know that if any code the nodes complete, it will dispose the rest
// Step 3: Now we can check if continuation is complete
if (cont.isCompleted) {
Expand All @@ -83,34 +83,41 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {

private inner class DisposeHandlersOnCancel(private val nodes: Array<AwaitAllNode>) : CancelHandler() {
fun disposeAll() {
nodes.forEach { it.handle.dispose() }
nodes.forEach { it.disposeHandle() }
}

override fun invoke(cause: Throwable?) { disposeAll() }
override fun toString(): String = "DisposeHandlersOnCancel[$nodes]"
}

private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
lateinit var handle: DisposableHandle

private val _handle = atomic<DisposableHandle?>(null)
private val _disposer = atomic<DisposeHandlersOnCancel?>(null)
var disposer: DisposeHandlersOnCancel?
get() = _disposer.value
set(value) { _disposer.value = value }


fun setHandle(handle: DisposableHandle) { _handle.value = handle }
fun setDisposer(disposer: DisposeHandlersOnCancel) { _disposer.value = disposer }

fun disposeHandle() {
_handle.value?.dispose()
_handle.value = null
}

override fun invoke(cause: Throwable?) {
if (cause != null) {
val token = continuation.tryResumeWithException(cause)
if (token != null) {
continuation.completeResume(token)
// volatile read of disposer AFTER continuation is complete
// and if disposer was already set (all handlers where already installed, then dispose them all)
disposer?.disposeAll()
_disposer.value?.disposeAll()
}
} else if (notCompletedCount.decrementAndGet() == 0) {
continuation.resume(deferreds.map { it.getCompleted() })
// Note that all deferreds are complete here, so we don't need to dispose their nodes
}
// Release all the refs for Kotlin/Native
_handle.value = null
_disposer.value = null
}
}
}
13 changes: 13 additions & 0 deletions kotlinx-coroutines-core/native/test/WorkerDispatcherTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -313,5 +313,18 @@ class WorkerDispatcherTest : TestBase() {
finish(6)
}

@Test
fun testAwaitAll() = runTest {
expect(1)
val d1 = async(dispatcher) {
"A"
}
val d2 = async(dispatcher) {
"B"
}
assertEquals("AB", awaitAll(d1, d2).joinToString(""))
finish(2)
}

private data class Data(val s: String)
}

0 comments on commit c272cb6

Please sign in to comment.