From 261a3487e54e74c781c55c63c34faa9e6e0dbc92 Mon Sep 17 00:00:00 2001 From: Youssef Shoaib Date: Sat, 16 Nov 2024 00:23:45 +0000 Subject: [PATCH] Add STM contracts --- .../src/commonMain/kotlin/arrow/fx/stm/STM.kt | 12 +++++++++++- .../src/commonMain/kotlin/arrow/fx/stm/TVar.kt | 6 +++--- .../commonMain/kotlin/arrow/fx/stm/internal/Impl.kt | 11 +++++++++-- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/STM.kt b/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/STM.kt index 9f80b08a98e..838b5e08496 100644 --- a/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/STM.kt +++ b/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/STM.kt @@ -1,8 +1,13 @@ +@file:OptIn(ExperimentalContracts::class) + package arrow.fx.stm import arrow.fx.stm.internal.STMTransaction import arrow.fx.stm.internal.alterHamtWithHash import arrow.fx.stm.internal.lookupHamtWithHash +import kotlin.contracts.ExperimentalContracts +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract import kotlin.reflect.KProperty /** @@ -1692,4 +1697,9 @@ public fun STM.check(b: Boolean): Unit = if (b.not()) retry() else Unit * Rethrows all exceptions not caught by inside [f]. Remember to use [STM.catch] to handle exceptions as `try {} catch` will not handle transaction * state properly! */ -public suspend fun atomically(f: STM.() -> A): A = STMTransaction(f).commit() +public suspend fun atomically(f: STM.() -> A): A { + contract { + callsInPlace(f, InvocationKind.AT_LEAST_ONCE) + } + return STMTransaction().commit(f) +} diff --git a/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/TVar.kt b/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/TVar.kt index 195c6b9f99e..9b47ea2ac16 100644 --- a/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/TVar.kt +++ b/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/TVar.kt @@ -148,7 +148,7 @@ public class TVar internal constructor(a: A) { * Changes are pushed to waiting transactions via [notify] */ // TODO Use a set here, and preferably something that uses sharing to avoid gc pressure from copying... - private val waiting = Atomic>>(emptyList()) + private val waiting = Atomic>(emptyList()) override fun hashCode(): Int = id.hashCode() @@ -196,7 +196,7 @@ public class TVar internal constructor(a: A) { * This does not happen implicitly on [release] because release may also write the same value back on * normal lock release. */ - internal fun registerWaiting(trans: STMTransaction<*>, expected: A): Boolean { + internal fun registerWaiting(trans: STMTransaction, expected: A): Boolean { if (value !== expected) { trans.getCont()?.resume(Unit) return false @@ -212,7 +212,7 @@ public class TVar internal constructor(a: A) { /** * A transaction resumed so remove it from the [TVar] */ - internal fun removeWaiting(trans: STMTransaction<*>): Unit { + internal fun removeWaiting(trans: STMTransaction): Unit { waiting.update { it.filter { it !== trans } } } diff --git a/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/internal/Impl.kt b/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/internal/Impl.kt index c2d624ac7a3..fe6a2ed81ad 100644 --- a/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/internal/Impl.kt +++ b/arrow-libs/fx/arrow-fx-stm/src/commonMain/kotlin/arrow/fx/stm/internal/Impl.kt @@ -1,4 +1,5 @@ @file:Suppress("UNCHECKED_CAST") +@file:OptIn(ExperimentalContracts::class) package arrow.fx.stm.internal @@ -7,6 +8,9 @@ import arrow.atomic.value import arrow.fx.stm.STM import arrow.fx.stm.TVar import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.contracts.ExperimentalContracts +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract import kotlin.coroutines.Continuation /** @@ -168,7 +172,7 @@ public expect object RetryException : Throwable * * Keeps the continuation that [TVar]'s use to resume this transaction. */ -internal class STMTransaction(val f: STM.() -> A) { +internal class STMTransaction { private val cont = Atomic?>(null) /** @@ -183,7 +187,10 @@ internal class STMTransaction(val f: STM.() -> A) { // If they both pass a threshold we should probably kill the transaction and throw // "live-locked" transactions are those that are continuously retry due to accessing variables with high contention and // taking longer than the transactions updating those variables. - suspend fun commit(): A { + suspend fun commit(f: STM.() -> A): A { + contract { + callsInPlace(f, InvocationKind.AT_LEAST_ONCE) + } loop@ while (true) { val frame = STMFrame() try {