Skip to content

Commit

Permalink
Add STM contracts
Browse files Browse the repository at this point in the history
  • Loading branch information
kyay10 committed Nov 16, 2024
1 parent 4227b37 commit 261a348
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
@file:OptIn(ExperimentalContracts::class)

package arrow.fx.stm

import arrow.fx.stm.internal.STMTransaction
import arrow.fx.stm.internal.alterHamtWithHash
import arrow.fx.stm.internal.lookupHamtWithHash
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.reflect.KProperty

/**
Expand Down Expand Up @@ -1692,4 +1697,9 @@ public fun STM.check(b: Boolean): Unit = if (b.not()) retry() else Unit
* Rethrows all exceptions not caught by inside [f]. Remember to use [STM.catch] to handle exceptions as `try {} catch` will not handle transaction
* state properly!
*/
public suspend fun <A> atomically(f: STM.() -> A): A = STMTransaction(f).commit()
public suspend fun <A> atomically(f: STM.() -> A): A {
contract {
callsInPlace(f, InvocationKind.AT_LEAST_ONCE)
}
return STMTransaction().commit(f)
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public class TVar<A> internal constructor(a: A) {
* Changes are pushed to waiting transactions via [notify]
*/
// TODO Use a set here, and preferably something that uses sharing to avoid gc pressure from copying...
private val waiting = Atomic<List<STMTransaction<*>>>(emptyList())
private val waiting = Atomic<List<STMTransaction>>(emptyList())

override fun hashCode(): Int = id.hashCode()

Expand Down Expand Up @@ -196,7 +196,7 @@ public class TVar<A> internal constructor(a: A) {
* This does not happen implicitly on [release] because release may also write the same value back on
* normal lock release.
*/
internal fun registerWaiting(trans: STMTransaction<*>, expected: A): Boolean {
internal fun registerWaiting(trans: STMTransaction, expected: A): Boolean {
if (value !== expected) {
trans.getCont()?.resume(Unit)
return false
Expand All @@ -212,7 +212,7 @@ public class TVar<A> internal constructor(a: A) {
/**
* A transaction resumed so remove it from the [TVar]
*/
internal fun removeWaiting(trans: STMTransaction<*>): Unit {
internal fun removeWaiting(trans: STMTransaction): Unit {
waiting.update { it.filter { it !== trans } }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
@file:Suppress("UNCHECKED_CAST")
@file:OptIn(ExperimentalContracts::class)

package arrow.fx.stm.internal

Expand All @@ -7,6 +8,9 @@ import arrow.atomic.value
import arrow.fx.stm.STM
import arrow.fx.stm.TVar
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.coroutines.Continuation

/**
Expand Down Expand Up @@ -168,7 +172,7 @@ public expect object RetryException : Throwable
*
* Keeps the continuation that [TVar]'s use to resume this transaction.
*/
internal class STMTransaction<A>(val f: STM.() -> A) {
internal class STMTransaction {
private val cont = Atomic<Continuation<Unit>?>(null)

/**
Expand All @@ -183,7 +187,10 @@ internal class STMTransaction<A>(val f: STM.() -> A) {
// If they both pass a threshold we should probably kill the transaction and throw
// "live-locked" transactions are those that are continuously retry due to accessing variables with high contention and
// taking longer than the transactions updating those variables.
suspend fun commit(): A {
suspend fun <A> commit(f: STM.() -> A): A {
contract {
callsInPlace(f, InvocationKind.AT_LEAST_ONCE)
}
loop@ while (true) {
val frame = STMFrame()
try {
Expand Down

0 comments on commit 261a348

Please sign in to comment.