Skip to content

Commit

Permalink
Merge branch 'main' into k2-compilation-take2
Browse files Browse the repository at this point in the history
  • Loading branch information
serras authored Aug 23, 2023
2 parents ef16dc9 + 9b41cba commit a3a0c14
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 0 deletions.
6 changes: 6 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public final class arrow/fx/coroutines/FlowExtensions {
public static final fun metered-HG0u8IE (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun parMap (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun parMap$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun parMapNotNullUnordered (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun parMapNotNullUnordered$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun parMapUnordered (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun parMapUnordered$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun repeat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
Expand All @@ -172,6 +174,10 @@ public final class arrow/fx/coroutines/ParMapKt {
public static final fun parMap (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun parMap$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun parMap$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun parMapNotNull (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parMapNotNull (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun parMapNotNull$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun parMapNotNull$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun parMapOrAccumulate (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parMapOrAccumulate (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parMapOrAccumulate (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ public suspend fun <A, B> Iterable<A>.parMap(
map { async(context) { transform.invoke(this, it) } }.awaitAll()
}

public suspend fun <A, B> Iterable<A>.parMapNotNull(
context: CoroutineContext = EmptyCoroutineContext,
concurrency: Int,
transform: suspend CoroutineScope.(A) -> B?
): List<B> =
parMap(context, concurrency, transform).filterNotNull()

public suspend fun <A, B> Iterable<A>.parMapNotNull(
context: CoroutineContext = EmptyCoroutineContext,
transform: suspend CoroutineScope.(A) -> B?
): List<B> =
parMap(context, transform).filterNotNull()

/** Temporary intersection type, until we have context receivers */
public class ScopedRaiseAccumulate<Error>(
raise: Raise<NonEmptyList<Error>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public inline fun <A, B> Flow<A>.parMap(
* <!--- KNIT example-flow-04.kt -->
*/
@FlowPreview
@ExperimentalCoroutinesApi
public inline fun <A, B> Flow<A>.parMapUnordered(
concurrency: Int = DEFAULT_CONCURRENCY,
crossinline transform: suspend (a: A) -> B
Expand All @@ -193,6 +194,43 @@ public inline fun <A, B> Flow<A>.parMapUnordered(
}
}.flattenMerge(concurrency)

/**
* Like [mapNotNull], but will evaluate effects in parallel, emitting the results downstream.
* The number of concurrent effects is limited by [concurrency].
*
* See [parMap] if retaining the original order of the stream is required.
*
* ```kotlin
* import kotlinx.coroutines.delay
* import kotlinx.coroutines.flow.flowOf
* import kotlinx.coroutines.flow.toList
* import kotlinx.coroutines.flow.collect
* import arrow.fx.coroutines.parMapNotNullUnordered
*
* //sampleStart
* suspend fun main(): Unit {
* flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
* .parMapNotNullUnordered { a ->
* delay(100)
* a.takeIf { a % 2 == 0 }
* }.toList() // [4, 6, 2, 8, 10]
* }
* //sampleEnd
* ```
* <!--- KNIT example-flow-05.kt -->
*/
@FlowPreview
@ExperimentalCoroutinesApi
public inline fun <A, B> Flow<A>.parMapNotNullUnordered(
concurrency: Int = DEFAULT_CONCURRENCY,
crossinline transform: suspend (a: A) -> B?
): Flow<B> =
map { o ->
flow {
transform(o)?.let { emit(it) }
}
}.flattenMerge(concurrency)

/** Repeats the Flow forever */
public fun <A> Flow<A>.repeat(): Flow<A> =
flow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,75 @@ class ParMapTest : StringSpec({
} shouldBe NonEmptyList(e, (1 until 100).map { e }).left()
}
}

"parMapNotNull is stack-safe" {
val count = 20_000
val ref = Atomic(0)
(0 until count).parMapNotNull { _: Int ->
ref.update { it + 1 }
}
ref.get() shouldBe count
}

"parMapNotNull runs in parallel" {
val promiseA = CompletableDeferred<Unit>()
val promiseB = CompletableDeferred<Unit>()
val promiseC = CompletableDeferred<Unit>()

listOf(
suspend {
promiseA.await()
promiseC.complete(Unit)
},
suspend {
promiseB.await()
promiseA.complete(Unit)
},
suspend {
promiseB.complete(Unit)
promiseC.await()
}
).parMapNotNull { it.invoke() }
}

"parMapNotNull results in the correct error" {
checkAll(
Arb.int(min = 10, max = 20),
Arb.int(min = 1, max = 9),
Arb.throwable()
) { n, killOn, e ->
Either.catch {
(0 until n).parMapNotNull { i ->
if (i == killOn) throw e else Unit
}
} should leftException(e)
}
}

"parMapNotNull(concurrency = 1) only runs one task at a time" {
val promiseA = CompletableDeferred<Unit>()

withTimeoutOrNull(100.milliseconds) {
listOf(
suspend { promiseA.await() },
suspend { promiseA.complete(Unit) }
).parMapNotNull(concurrency = 1) { it.invoke() }
} shouldBe null
}

"parMapNotNull discards nulls" {
(0 until 100).parMapNotNull { _ ->
null
} shouldBe emptyList()
}

"parMapNotNull retains non-nulls" {
checkAll(Arb.int()) { i ->
(0 until 100).parMapNotNull { _ ->
i
} shouldBe List(100) { i }
}
}
})

private val emptyError: (Nothing, Nothing) -> Nothing =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// This file was automatically generated from flow.kt by Knit tool. Do not edit.
package arrow.fx.coroutines.examples.exampleFlow05

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.collect
import arrow.fx.coroutines.parMapNotNullUnordered

suspend fun main(): Unit {
flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.parMapNotNullUnordered { a ->
delay(100)
a.takeIf { a % 2 == 0 }
}.toList() // [4, 6, 2, 8, 10]
}

0 comments on commit a3a0c14

Please sign in to comment.