diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api index f02721af9ec..315aa8734d7 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api @@ -267,3 +267,15 @@ public final class arrow/fx/coroutines/ScopedRaiseAccumulate : arrow/core/raise/ public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext; } +public final class arrow/fx/coroutines/await/AwaitAllScope : kotlinx/coroutines/CoroutineScope { + public fun (Lkotlinx/coroutines/CoroutineScope;)V + public final fun async (Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/Deferred; + public static synthetic fun async$default (Larrow/fx/coroutines/await/AwaitAllScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/Deferred; + public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext; +} + +public final class arrow/fx/coroutines/await/AwaitAllScopeKt { + public static final fun awaitAll (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitAll (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + diff --git a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api index 37ee62ee5ed..d4bebff4364 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api +++ b/arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.klib.api @@ -18,6 +18,12 @@ final class <#A: kotlin/Any?> arrow.fx.coroutines/ScopedRaiseAccumulate : arrow. final val coroutineContext // arrow.fx.coroutines/ScopedRaiseAccumulate.coroutineContext|{}coroutineContext[0] final fun (): kotlin.coroutines/CoroutineContext // arrow.fx.coroutines/ScopedRaiseAccumulate.coroutineContext.|(){}[0] } +final class arrow.fx.coroutines.await/AwaitAllScope : kotlinx.coroutines/CoroutineScope { // arrow.fx.coroutines.await/AwaitAllScope|null[0] + constructor (kotlinx.coroutines/CoroutineScope) // arrow.fx.coroutines.await/AwaitAllScope.|(kotlinx.coroutines.CoroutineScope){}[0] + final fun <#A1: kotlin/Any?> async(kotlin.coroutines/CoroutineContext = ..., kotlinx.coroutines/CoroutineStart = ..., kotlin.coroutines/SuspendFunction1): kotlinx.coroutines/Deferred<#A1> // arrow.fx.coroutines.await/AwaitAllScope.async|async(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.CoroutineStart;kotlin.coroutines.SuspendFunction1){0§}[0] + final val coroutineContext // arrow.fx.coroutines.await/AwaitAllScope.coroutineContext|{}coroutineContext[0] + final fun (): kotlin.coroutines/CoroutineContext // arrow.fx.coroutines.await/AwaitAllScope.coroutineContext.|(){}[0] +} final class arrow.fx.coroutines/CountDownLatch { // arrow.fx.coroutines/CountDownLatch|null[0] constructor (kotlin/Long) // arrow.fx.coroutines/CountDownLatch.|(kotlin.Long){}[0] final fun count(): kotlin/Long // arrow.fx.coroutines/CountDownLatch.count|count(){}[0] @@ -57,6 +63,8 @@ final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlin.collections/Iterabl final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlin.collections/Iterable<#A>).arrow.fx.coroutines/parMapNotNull(kotlin.coroutines/CoroutineContext = ..., kotlin/Int, kotlin.coroutines/SuspendFunction2): kotlin.collections/List<#B> // arrow.fx.coroutines/parMapNotNull|parMapNotNull@kotlin.collections.Iterable<0:0>(kotlin.coroutines.CoroutineContext;kotlin.Int;kotlin.coroutines.SuspendFunction2){0§;1§}[0] final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlin.coroutines/SuspendFunction1).arrow.fx.coroutines/use(kotlin.coroutines/SuspendFunction1<#A, #B>): #B // arrow.fx.coroutines/use|use@kotlin.coroutines.SuspendFunction1(kotlin.coroutines.SuspendFunction1<0:0,0:1>){0§;1§}[0] final suspend fun <#A: kotlin/Any?> (kotlin.coroutines/SuspendFunction1).arrow.fx.coroutines/allocated(): kotlin/Pair<#A, kotlin.coroutines/SuspendFunction1> // arrow.fx.coroutines/allocated|allocated@kotlin.coroutines.SuspendFunction1(){0§}[0] +final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines/CoroutineScope).arrow.fx.coroutines.await/awaitAll(kotlin.coroutines/SuspendFunction1): #A // arrow.fx.coroutines.await/awaitAll|awaitAll@kotlinx.coroutines.CoroutineScope(kotlin.coroutines.SuspendFunction1){0§}[0] +final suspend fun <#A: kotlin/Any?> arrow.fx.coroutines.await/awaitAll(kotlin.coroutines/SuspendFunction1): #A // arrow.fx.coroutines.await/awaitAll|awaitAll(kotlin.coroutines.SuspendFunction1){0§}[0] final suspend fun <#A: kotlin/Any?> arrow.fx.coroutines/resourceScope(kotlin.coroutines/SuspendFunction1): #A // arrow.fx.coroutines/resourceScope|resourceScope(kotlin.coroutines.SuspendFunction1){0§}[0] final suspend fun arrow.fx.coroutines/cancelAndCompose(kotlinx.coroutines/Deferred<*>, kotlinx.coroutines/Deferred<*>) // arrow.fx.coroutines/cancelAndCompose|cancelAndCompose(kotlinx.coroutines.Deferred<*>;kotlinx.coroutines.Deferred<*>){}[0] final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin/Any?, #D: kotlin/Any?, #E: kotlin/Any?, #F: kotlin/Any?, #G: kotlin/Any?, #H: kotlin/Any?, #I: kotlin/Any?, #J: kotlin/Any?, #K: kotlin/Any?> (arrow.core.raise/Raise<#A>).arrow.fx.coroutines/parZipOrAccumulate(crossinline kotlin/Function2<#A, #A, #A>, crossinline kotlin.coroutines/SuspendFunction1, #B>, crossinline kotlin.coroutines/SuspendFunction1, #C>, crossinline kotlin.coroutines/SuspendFunction1, #D>, crossinline kotlin.coroutines/SuspendFunction1, #E>, crossinline kotlin.coroutines/SuspendFunction1, #F>, crossinline kotlin.coroutines/SuspendFunction1, #G>, crossinline kotlin.coroutines/SuspendFunction1, #H>, crossinline kotlin.coroutines/SuspendFunction1, #I>, crossinline kotlin.coroutines/SuspendFunction1, #J>, crossinline kotlin.coroutines/SuspendFunction10): #K // arrow.fx.coroutines/parZipOrAccumulate|parZipOrAccumulate@arrow.core.raise.Raise<0:0>(kotlin.Function2<0:0,0:0,0:0>;kotlin.coroutines.SuspendFunction1,0:1>;kotlin.coroutines.SuspendFunction1,0:2>;kotlin.coroutines.SuspendFunction1,0:3>;kotlin.coroutines.SuspendFunction1,0:4>;kotlin.coroutines.SuspendFunction1,0:5>;kotlin.coroutines.SuspendFunction1,0:6>;kotlin.coroutines.SuspendFunction1,0:7>;kotlin.coroutines.SuspendFunction1,0:8>;kotlin.coroutines.SuspendFunction1,0:9>;kotlin.coroutines.SuspendFunction10){0§;1§;2§;3§;4§;5§;6§;7§;8§;9§;10§}[0] diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/await/AwaitAllScope.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/await/AwaitAllScope.kt new file mode 100644 index 00000000000..3c20c5500af --- /dev/null +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/await/AwaitAllScope.kt @@ -0,0 +1,72 @@ +package arrow.fx.coroutines.await + +import arrow.atomic.Atomic +import arrow.atomic.update +import kotlinx.coroutines.async as coroutinesAsync +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.awaitAll as coroutinesAwaitAll +import kotlinx.coroutines.coroutineScope +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +public suspend fun awaitAll( + block: suspend AwaitAllScope.() -> A +): A = coroutineScope { block(AwaitAllScope(this)) } + +public suspend fun CoroutineScope.awaitAll( + block: suspend AwaitAllScope.() -> A +): A = block(AwaitAllScope(this)) + +/** + * Within an [AwaitAllScope], any call to [kotlinx.coroutines.Deferred.await] + * causes all the other [Deferred] in the same block to be awaited too. + * That way you can get more concurrency without having to sacrifice + * readability. + * + * ```kotlin + * suspend fun loadUserInfo(id: UserId): UserInfo = await { + * val name = async { loadUserFromDb(id) } + * val avatar = async { loadAvatar(id) } + * UserInfo( + * name.await(), // <- at this point every 'async' is 'await'ed + * avatar.await() // <- so when you reach this 'await', the value is already there + * ) + * } + * + * suspend fun loadUserInfoWithoutAwait(id: UserId): UserInfo { + * val name = async { loadUserFromDb(id) } + * val avatar = async { loadAvatar(id) } + * awaitAll(name, avatar) // <- this is required otherwise + * return UserInfo( + * name.await(), + * avatar.await() + * ) + * } + * ``` + */ +public class AwaitAllScope( + private val scope: CoroutineScope +): CoroutineScope by scope { + private val tasks: Atomic>> = Atomic(emptyList()) + + public fun async( + context: CoroutineContext = EmptyCoroutineContext, + start: CoroutineStart = CoroutineStart.DEFAULT, + block: suspend CoroutineScope.() -> T + ): Deferred { + val deferred = coroutinesAsync(context, start, block) + tasks.update { it + deferred } + return Await(deferred) + } + + private inner class Await( + private val deferred: Deferred + ): Deferred by deferred { + override suspend fun await(): T { + tasks.getAndSet(emptyList()).coroutinesAwaitAll() + return deferred.await() + } + } +}