Skip to content

Commit

Permalink
Do not try to start sharing strategy in an undispatched manner in nat…
Browse files Browse the repository at this point in the history
…ive-mt mode as it is not supported

Fixes #3136
  • Loading branch information
qwwdfsad committed Apr 5, 2022
1 parent 8dfe2e3 commit 73e843b
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 1 deletion.
4 changes: 3 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Share.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*

Expand Down Expand Up @@ -204,8 +205,9 @@ private fun <T> CoroutineScope.launchSharing(
* * Delayed sharing strategies have a chance to immediately observe consecutive subscriptions.
* E.g. in the cases like `flow.shareIn(...); flow.take(1)` we want sharing strategy to see the initial subscription
* * Eager sharing does not start immediately, so the subscribers have actual chance to subscribe _prior_ to sharing.
* // TODO: kludge: native-mt doesn't support undispatched
*/
val start = if (started == SharingStarted.Eagerly) CoroutineStart.DEFAULT else CoroutineStart.UNDISPATCHED
val start = if (started == SharingStarted.Eagerly || isNativeMt) CoroutineStart.DEFAULT else CoroutineStart.UNDISPATCHED
return launch(context, start = start) { // the single coroutine to rule the sharing
// Optimize common built-in started strategies
when {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ public expect open class SynchronizedObject() // marker abstract class
*/
@InternalCoroutinesApi
public expect inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T

internal expect val isNativeMt: Boolean
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlin.test.*

class ShareInTest : TestBase() {
Expand Down Expand Up @@ -213,6 +214,7 @@ class ShareInTest : TestBase() {

@Test
fun testShouldStart() = runTest {
if (isNativeMt) return@runTest
val flow = flow {
expect(2)
emit(1)
Expand All @@ -229,6 +231,7 @@ class ShareInTest : TestBase() {

@Test
fun testShouldStartScalar() = runTest {
if (isNativeMt) return@runTest
val j = Job()
val shared = flowOf(239).stateIn(this + j, SharingStarted.Lazily, 42)
assertEquals(42, shared.first())
Expand Down
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/js/src/internal/Synchronized.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ public actual typealias SynchronizedObject = Any
@InternalCoroutinesApi
public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T =
block()

internal actual val isNativeMt: Boolean = false
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ public actual typealias SynchronizedObject = Any
@InternalCoroutinesApi
public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T =
kotlin.synchronized(lock, block)

internal actual val isNativeMt: Boolean = false
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/native/src/internal/Synchronized.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ public actual typealias SynchronizedObject = kotlinx.atomicfu.locks.Synchronized
*/
@InternalCoroutinesApi
public actual inline fun <T> synchronized(lock: SynchronizedObject, block: () -> T): T = lock.withLock2(block)

@OptIn(ExperimentalStdlibApi::class)
internal actual val isNativeMt: Boolean = !kotlin.native.isExperimentalMM()

0 comments on commit 73e843b

Please sign in to comment.