File tree Expand file tree Collapse file tree 2 files changed +18
-2
lines changed
reactive/kotlinx-coroutines-reactive Expand file tree Collapse file tree 2 files changed +18
-2
lines changed Original file line number Diff line number Diff line change @@ -6,6 +6,7 @@ package kotlinx.coroutines.reactive
66import kotlinx.atomicfu.*
77import kotlinx.coroutines.*
88import kotlinx.coroutines.channels.*
9+ import kotlinx.coroutines.intrinsics.*
910import kotlinx.coroutines.selects.*
1011import kotlinx.coroutines.sync.*
1112import org.reactivestreams.*
@@ -104,10 +105,22 @@ public class PublisherCoroutine<in T>(
104105 // registerSelectSend
105106 @Suppress(" PARAMETER_NAME_CHANGED_ON_OVERRIDE" )
106107 override fun <R > registerSelectClause2 (select : SelectInstance <R >, element : T , block : suspend (SendChannel <T >) -> R ) {
107- mutex.onLock.registerSelectClause2(select, null ) {
108+ val clause = suspend {
108109 doLockedNext(element)?.let { throw it }
109110 block(this )
110111 }
112+
113+ // TODO discuss it
114+ launch(start = CoroutineStart .UNDISPATCHED ) {
115+ mutex.lock()
116+ // Already selected -- bail out
117+ if (! select.trySelect()) {
118+ mutex.unlock()
119+ return @launch
120+ }
121+
122+ clause.startCoroutineCancellable(select.completion)
123+ }
111124 }
112125
113126 /*
Original file line number Diff line number Diff line change @@ -8,6 +8,7 @@ import kotlinx.coroutines.*
88import kotlinx.coroutines.CancellationException
99import kotlinx.coroutines.channels.*
1010import kotlinx.coroutines.flow.*
11+ import kotlinx.coroutines.sync.*
1112import org.junit.Test
1213import org.reactivestreams.*
1314import java.util.concurrent.*
@@ -308,13 +309,15 @@ class PublishTest : TestBase() {
308309 }
309310
310311 expect(1 )
312+ val collectorLatch = Mutex (true )
311313 val job = launch {
312314 published.asFlow().buffer(0 ).collect {
313315 expect(4 )
316+ collectorLatch.unlock()
314317 hang { expect(6 ) }
315318 }
316319 }
317- yield ()
320+ collectorLatch.lock ()
318321 expect(5 )
319322 job.cancelAndJoin()
320323 latch.await()
You can’t perform that action at this time.
0 commit comments