From c272cb688d46e4a965c3fe4e8c25564b29b534be Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 16 Jun 2020 21:22:10 +0300 Subject: [PATCH] Fix native-mt awaitAll Fixes #2025 --- kotlinx-coroutines-core/common/src/Await.kt | 27 ++++++++++++------- .../native/test/WorkerDispatcherTest.kt | 13 +++++++++ 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/Await.kt b/kotlinx-coroutines-core/common/src/Await.kt index dd1e1771f2..bd46e03d3c 100644 --- a/kotlinx-coroutines-core/common/src/Await.kt +++ b/kotlinx-coroutines-core/common/src/Await.kt @@ -65,12 +65,12 @@ private class AwaitAll(private val deferreds: Array>) { val deferred = deferreds[i] deferred.start() // To properly await lazily started deferreds AwaitAllNode(cont, deferred).apply { - handle = deferred.invokeOnCompletion(asHandler) + setHandle(deferred.invokeOnCompletion(asHandler)) } } val disposer = DisposeHandlersOnCancel(nodes) // Step 2: Set disposer to each node - nodes.forEach { it.disposer = disposer } + nodes.forEach { it.setDisposer(disposer) } // Here we know that if any code the nodes complete, it will dispose the rest // Step 3: Now we can check if continuation is complete if (cont.isCompleted) { @@ -83,7 +83,7 @@ private class AwaitAll(private val deferreds: Array>) { private inner class DisposeHandlersOnCancel(private val nodes: Array) : CancelHandler() { fun disposeAll() { - nodes.forEach { it.handle.dispose() } + nodes.forEach { it.disposeHandle() } } override fun invoke(cause: Throwable?) { disposeAll() } @@ -91,13 +91,17 @@ private class AwaitAll(private val deferreds: Array>) { } private inner class AwaitAllNode(private val continuation: CancellableContinuation>, job: Job) : JobNode(job) { - lateinit var handle: DisposableHandle - + private val _handle = atomic(null) private val _disposer = atomic(null) - var disposer: DisposeHandlersOnCancel? - get() = _disposer.value - set(value) { _disposer.value = value } - + + fun setHandle(handle: DisposableHandle) { _handle.value = handle } + fun setDisposer(disposer: DisposeHandlersOnCancel) { _disposer.value = disposer } + + fun disposeHandle() { + _handle.value?.dispose() + _handle.value = null + } + override fun invoke(cause: Throwable?) { if (cause != null) { val token = continuation.tryResumeWithException(cause) @@ -105,12 +109,15 @@ private class AwaitAll(private val deferreds: Array>) { continuation.completeResume(token) // volatile read of disposer AFTER continuation is complete // and if disposer was already set (all handlers where already installed, then dispose them all) - disposer?.disposeAll() + _disposer.value?.disposeAll() } } else if (notCompletedCount.decrementAndGet() == 0) { continuation.resume(deferreds.map { it.getCompleted() }) // Note that all deferreds are complete here, so we don't need to dispose their nodes } + // Release all the refs for Kotlin/Native + _handle.value = null + _disposer.value = null } } } diff --git a/kotlinx-coroutines-core/native/test/WorkerDispatcherTest.kt b/kotlinx-coroutines-core/native/test/WorkerDispatcherTest.kt index f6add43adc..41bd4a3ec2 100644 --- a/kotlinx-coroutines-core/native/test/WorkerDispatcherTest.kt +++ b/kotlinx-coroutines-core/native/test/WorkerDispatcherTest.kt @@ -313,5 +313,18 @@ class WorkerDispatcherTest : TestBase() { finish(6) } + @Test + fun testAwaitAll() = runTest { + expect(1) + val d1 = async(dispatcher) { + "A" + } + val d2 = async(dispatcher) { + "B" + } + assertEquals("AB", awaitAll(d1, d2).joinToString("")) + finish(2) + } + private data class Data(val s: String) } \ No newline at end of file