File tree Expand file tree Collapse file tree 1 file changed +14
-1
lines changed
reactive/kotlinx-coroutines-reactive/src Expand file tree Collapse file tree 1 file changed +14
-1
lines changed Original file line number Diff line number Diff line change @@ -6,6 +6,7 @@ package kotlinx.coroutines.reactive
66import kotlinx.atomicfu.*
77import kotlinx.coroutines.*
88import kotlinx.coroutines.channels.*
9+ import kotlinx.coroutines.intrinsics.*
910import kotlinx.coroutines.selects.*
1011import kotlinx.coroutines.sync.*
1112import org.reactivestreams.*
@@ -104,10 +105,22 @@ public class PublisherCoroutine<in T>(
104105 // registerSelectSend
105106 @Suppress(" PARAMETER_NAME_CHANGED_ON_OVERRIDE" )
106107 override fun <R > registerSelectClause2 (select : SelectInstance <R >, element : T , block : suspend (SendChannel <T >) -> R ) {
107- mutex.onLock.registerSelectClause2(select, null ) {
108+ val clause = suspend {
108109 doLockedNext(element)?.let { throw it }
109110 block(this )
110111 }
112+
113+ // TODO discuss it
114+ launch(start = CoroutineStart .UNDISPATCHED ) {
115+ mutex.lock()
116+ // Already selected -- bail out
117+ if (! select.trySelect()) {
118+ mutex.unlock()
119+ return @launch
120+ }
121+
122+ clause.startCoroutineCancellable(select.completion)
123+ }
111124 }
112125
113126 /*
You can’t perform that action at this time.
0 commit comments