Skip to content

Commit b43b7bb

Browse files
committed
Introduce new select implementation, but leave TODO-s for channels -- they will be replaced with new ones.
1 parent 9658d3f commit b43b7bb

File tree

24 files changed

+602
-1215
lines changed

24 files changed

+602
-1215
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ junit5_version=5.7.0
1313
atomicfu_version=0.16.3
1414
knit_version=0.3.0
1515
html_version=0.7.2
16-
lincheck_version=2.14
16+
lincheck_version=2.14.1
1717
dokka_version=1.5.0
1818
byte_buddy_version=1.10.9
1919
reactor_version=3.4.1

kotlinx-coroutines-core/common/src/Builders.common.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,10 @@ public fun <T> CoroutineScope.async(
9696
private open class DeferredCoroutine<T>(
9797
parentContext: CoroutineContext,
9898
active: Boolean
99-
) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T>, SelectClause1<T> {
99+
) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T> {
100100
override fun getCompleted(): T = getCompletedInternal() as T
101101
override suspend fun await(): T = awaitInternal() as T
102-
override val onAwait: SelectClause1<T> get() = this
103-
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) =
104-
registerSelectClause1Internal(select, block)
102+
override val onAwait: SelectClause1<T> get() = onAwaitInternal as SelectClause1<T>
105103
}
106104

107105
private class LazyDeferredCoroutine<T>(

kotlinx-coroutines-core/common/src/CompletableDeferred.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,12 @@ public fun <T> CompletableDeferred(value: T): CompletableDeferred<T> = Completab
7979
@Suppress("UNCHECKED_CAST")
8080
private class CompletableDeferredImpl<T>(
8181
parent: Job?
82-
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
82+
) : JobSupport(true), CompletableDeferred<T> {
8383
init { initParentJob(parent) }
8484
override val onCancelComplete get() = true
8585
override fun getCompleted(): T = getCompletedInternal() as T
8686
override suspend fun await(): T = awaitInternal() as T
87-
override val onAwait: SelectClause1<T> get() = this
88-
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) =
89-
registerSelectClause1Internal(select, block)
87+
override val onAwait: SelectClause1<T> get() = onAwaitInternal as SelectClause1<T>
9088

9189
override fun complete(value: T): Boolean =
9290
makeCompleting(value)

kotlinx-coroutines-core/common/src/JobSupport.kt

Lines changed: 34 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package kotlinx.coroutines
77

88
import kotlinx.atomicfu.*
99
import kotlinx.coroutines.internal.*
10-
import kotlinx.coroutines.intrinsics.*
1110
import kotlinx.coroutines.selects.*
1211
import kotlin.coroutines.*
1312
import kotlin.coroutines.intrinsics.*
@@ -25,7 +24,7 @@ import kotlin.native.concurrent.*
2524
* @suppress **This is unstable API and it is subject to change.**
2625
*/
2726
@Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
28-
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
27+
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob {
2928
final override val key: CoroutineContext.Key<*> get() = Job
3029

3130
/*
@@ -560,26 +559,18 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
560559
}
561560

562561
public final override val onJoin: SelectClause0
563-
get() = this
562+
get() = SelectClause0Impl(
563+
objForSelect = this@JobSupport,
564+
regFunc = JobSupport::registerSelectForOnJoin as RegistrationFunction
565+
)
564566

565-
// registerSelectJoin
566-
public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) {
567-
// fast-path -- check state and select/return if needed
568-
loopOnState { state ->
569-
if (select.isSelected) return
570-
if (state !is Incomplete) {
571-
// already complete -- select result
572-
if (select.trySelect()) {
573-
block.startCoroutineUnintercepted(select.completion)
574-
}
575-
return
576-
}
577-
if (startInternal(state) == 0) {
578-
// slow-path -- register waiter for completion
579-
select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(select, block).asHandler))
580-
return
581-
}
567+
private fun registerSelectForOnJoin(select: SelectInstance<*>, ignoredParam: Any?) {
568+
if (!joinInternal()) {
569+
select.selectInRegPhase(Unit)
570+
return
582571
}
572+
val disposableHandle = invokeOnCompletion { select.trySelect(this@JobSupport, Unit) }
573+
select.invokeOnCompletion { disposableHandle.dispose() }
583574
}
584575

585576
/**
@@ -1234,46 +1225,36 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
12341225
cont.getResult()
12351226
}
12361227

1237-
/**
1238-
* @suppress **This is unstable API and it is subject to change.**
1239-
*/
1240-
// registerSelectAwaitInternal
1241-
@Suppress("UNCHECKED_CAST")
1242-
internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) {
1243-
// fast-path -- check state and select/return if needed
1244-
loopOnState { state ->
1245-
if (select.isSelected) return
1228+
1229+
internal val onAwaitInternal: SelectClause1<*> get() = SelectClause1Impl<Any?>(
1230+
objForSelect = this,
1231+
regFunc = JobSupport::onAwaitInternalRegFunc as RegistrationFunction,
1232+
processResFunc = JobSupport::onAwaitInternalProcessResFunc as ProcessResultFunction
1233+
)
1234+
1235+
private fun onAwaitInternalRegFunc(select: SelectInstance<*>, ignoredParam: Any?) {
1236+
while (true) {
1237+
val state = this.state
12461238
if (state !is Incomplete) {
1247-
// already complete -- select result
1248-
if (select.trySelect()) {
1249-
if (state is CompletedExceptionally) {
1250-
select.resumeSelectWithException(state.cause)
1251-
}
1252-
else {
1253-
block.startCoroutineUnintercepted(state.unboxState() as T, select.completion)
1254-
}
1255-
}
1239+
val result = if (state is CompletedExceptionally) state else state.unboxState()
1240+
select.selectInRegPhase(result)
12561241
return
12571242
}
1258-
if (startInternal(state) == 0) {
1259-
// slow-path -- register waiter for completion
1260-
select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(select, block).asHandler))
1261-
return
1243+
if (startInternal(state) >= 0) break // break unless needs to retry
1244+
}
1245+
val disposableHandle = invokeOnCompletion {
1246+
val state = this.state
1247+
if (state !is Incomplete) {
1248+
val result = if (state is CompletedExceptionally) state else state.unboxState()
1249+
select.trySelect(this, result)
12621250
}
12631251
}
1252+
select.invokeOnCompletion { disposableHandle.dispose() }
12641253
}
12651254

1266-
/**
1267-
* @suppress **This is unstable API and it is subject to change.**
1268-
*/
1269-
@Suppress("UNCHECKED_CAST")
1270-
internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
1271-
val state = this.state
1272-
// Note: await is non-atomic (can be cancelled while dispatched)
1273-
if (state is CompletedExceptionally)
1274-
select.resumeSelectWithException(state.cause)
1275-
else
1276-
block.startCoroutineCancellable(state.unboxState() as T, select.completion)
1255+
private fun onAwaitInternalProcessResFunc(ignoredParam: Any?, result: Any?): Any? {
1256+
if (result is CompletedExceptionally) throw result.cause
1257+
return result
12771258
}
12781259
}
12791260

@@ -1420,26 +1401,6 @@ internal class DisposeOnCompletion(
14201401
override fun invoke(cause: Throwable?) = handle.dispose()
14211402
}
14221403

1423-
private class SelectJoinOnCompletion<R>(
1424-
private val select: SelectInstance<R>,
1425-
private val block: suspend () -> R
1426-
) : JobNode() {
1427-
override fun invoke(cause: Throwable?) {
1428-
if (select.trySelect())
1429-
block.startCoroutineCancellable(select.completion)
1430-
}
1431-
}
1432-
1433-
private class SelectAwaitOnCompletion<T, R>(
1434-
private val select: SelectInstance<R>,
1435-
private val block: suspend (T) -> R
1436-
) : JobNode() {
1437-
override fun invoke(cause: Throwable?) {
1438-
if (select.trySelect())
1439-
job.selectAwaitCompletion(select, block)
1440-
}
1441-
}
1442-
14431404
// -------- invokeOnCancellation nodes
14441405

14451406
/**

0 commit comments

Comments
 (0)