Skip to content

Commit

Permalink
Introduce Flow.any, Flow.all, Flow.none
Browse files Browse the repository at this point in the history
Fixes #4212
  • Loading branch information
CLOVIS-AI authored and dkhalanskyjb committed Dec 19, 2024
1 parent ec83195 commit bed3d29
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 0 deletions.
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,8 @@ public abstract interface class kotlinx/coroutines/flow/FlowCollector {

public final class kotlinx/coroutines/flow/FlowKt {
public static final field DEFAULT_CONCURRENCY_PROPERTY_NAME Ljava/lang/String;
public static final fun all (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun any (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun asFlow (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Ljava/util/Iterator;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lkotlin/jvm/functions/Function0;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -1075,6 +1077,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun merge (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge ([Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun none (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
Expand Down
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,8 @@ final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toMutableList(): kotlin.collections/MutableList<#A> // kotlinx.coroutines.channels/toMutableList|toMutableList@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toMutableSet(): kotlin.collections/MutableSet<#A> // kotlinx.coroutines.channels/toMutableSet|toMutableSet@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toSet(): kotlin.collections/Set<#A> // kotlinx.coroutines.channels/toSet|toSet@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/all(kotlin.coroutines/SuspendFunction1<#A, kotlin/Boolean>): kotlin/Boolean // kotlinx.coroutines.flow/all|all@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Boolean>){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/any(kotlin.coroutines/SuspendFunction1<#A, kotlin/Boolean>): kotlin/Boolean // kotlinx.coroutines.flow/any|any@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Boolean>){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/collectLatest(kotlin.coroutines/SuspendFunction1<#A, kotlin/Unit>) // kotlinx.coroutines.flow/collectLatest|collectLatest@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Unit>){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/count(): kotlin/Int // kotlinx.coroutines.flow/count|count@kotlinx.coroutines.flow.Flow<0:0>(){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/count(kotlin.coroutines/SuspendFunction1<#A, kotlin/Boolean>): kotlin/Int // kotlinx.coroutines.flow/count|count@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Boolean>){0§<kotlin.Any?>}[0]
Expand All @@ -1006,6 +1008,7 @@ final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.c
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/firstOrNull(kotlin.coroutines/SuspendFunction1<#A, kotlin/Boolean>): #A? // kotlinx.coroutines.flow/firstOrNull|firstOrNull@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Boolean>){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/last(): #A // kotlinx.coroutines.flow/last|last@kotlinx.coroutines.flow.Flow<0:0>(){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/lastOrNull(): #A? // kotlinx.coroutines.flow/lastOrNull|lastOrNull@kotlinx.coroutines.flow.Flow<0:0>(){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/none(kotlin.coroutines/SuspendFunction1<#A, kotlin/Boolean>): kotlin/Boolean // kotlinx.coroutines.flow/none|none@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction1<0:0,kotlin.Boolean>){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/single(): #A // kotlinx.coroutines.flow/single|single@kotlinx.coroutines.flow.Flow<0:0>(){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/singleOrNull(): #A? // kotlinx.coroutines.flow/singleOrNull|singleOrNull@kotlinx.coroutines.flow.Flow<0:0>(){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/stateIn(kotlinx.coroutines/CoroutineScope): kotlinx.coroutines.flow/StateFlow<#A> // kotlinx.coroutines.flow/stateIn|stateIn@kotlinx.coroutines.flow.Flow<0:0>(kotlinx.coroutines.CoroutineScope){0§<kotlin.Any?>}[0]
Expand Down
107 changes: 107 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/terminal/Logic.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
@file:JvmMultifileClass
@file:JvmName("FlowKt")

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.jvm.*


/**
* A terminal operator that returns `true` and immediately cancels the flow
* if at least one element matches the given [predicate].
*
* If the flow does not emit any elements or no element matches the predicate, the function returns `false`.
*
* Equivalent to `!all { !predicate(it) }` (see [Flow.all]) and `!none { predicate(it) }` (see [Flow.none]).
*
* Example:
*
* ```
* val myFlow = flow {
* repeat(10) {
* emit(it)
* }
* throw RuntimeException("You still didn't find the required number? I gave you ten!")
* }
* println(myFlow.any { it > 5 }) // true
* println(flowOf(1, 2, 3).any { it > 5 }) // false
* ```
*
* @see Iterable.any
* @see Sequence.any
*/
public suspend fun <T> Flow<T>.any(predicate: suspend (T) -> Boolean): Boolean {
var found = false
collectWhile {
val satisfies = predicate(it)
if (satisfies) found = true
!satisfies
}
return found
}

/**
* A terminal operator that returns `true` if all elements match the given [predicate],
* or returns `false` and cancels the flow as soon as the first element not matching the predicate is encountered.
*
* If the flow terminates without emitting any elements, the function returns `true` because there
* are no elements in it that *do not* match the predicate.
* See a more detailed explanation of this logic concept in the
* ["Vacuous truth"](https://en.wikipedia.org/wiki/Vacuous_truth) article.
*
* Equivalent to `!any { !predicate(it) }` (see [Flow.any]) and `none { !predicate(it) }` (see [Flow.none]).
*
* Example:
*
* ```
* val myFlow = flow {
* repeat(10) {
* emit(it)
* }
* throw RuntimeException("You still didn't find the required number? I gave you ten!")
* }
* println(myFlow.all { it <= 5 }) // false
* println(flowOf(1, 2, 3).all { it <= 5 }) // true
* ```
*
* @see Iterable.all
* @see Sequence.all
*/
public suspend fun <T> Flow<T>.all(predicate: suspend (T) -> Boolean): Boolean {
var foundCounterExample = false
collectWhile {
val satisfies = predicate(it)
if (!satisfies) foundCounterExample = true
satisfies
}
return !foundCounterExample
}

/**
* A terminal operator that returns `true` if no elements match the given [predicate],
* or returns `false` and cancels the flow as soon as the first element matching the predicate is encountered.
*
* If the flow terminates without emitting any elements, the function returns `true` because there
* are no elements in it that match the predicate.
* See a more detailed explanation of this logic concept in the
* ["Vacuous truth"](https://en.wikipedia.org/wiki/Vacuous_truth) article.
*
* Equivalent to `!any(predicate)` (see [Flow.any]) and `all { !predicate(it) }` (see [Flow.all]).
*
* Example:
* ```
* val myFlow = flow {
* repeat(10) {
* emit(it)
* }
* throw RuntimeException("You still didn't find the required number? I gave you ten!")
* }
* println(myFlow.none { it > 5 }) // false
* println(flowOf(1, 2, 3).none { it > 5 }) // true
* ```
*
* @see Iterable.none
* @see Sequence.none
*/
public suspend fun <T> Flow<T>.none(predicate: suspend (T) -> Boolean): Boolean = !any(predicate)
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package kotlinx.coroutines.flow

import kotlinx.coroutines.testing.*
import kotlin.test.*

class BooleanTerminationTest : TestBase() {
@Test
fun testAnyNominal() = runTest {
val flow = flow {
emit(1)
emit(2)
}

assertTrue(flow.any { it > 0 })
assertTrue(flow.any { it % 2 == 0 })
assertFalse(flow.any { it > 5 })
}

@Test
fun testAnyEmpty() = runTest {
assertFalse(emptyFlow<Int>().any { it > 0 })
}

@Test
fun testAnyInfinite() = runTest {
assertTrue(flow { while (true) { emit(5) } }.any { it == 5 })
}

@Test
fun testAnyShortCircuit() = runTest {
assertTrue(flow {
emit(1)
emit(2)
expectUnreached()
}.any {
it == 2
})
}

@Test
fun testAllNominal() = runTest {
val flow = flow {
emit(1)
emit(2)
}

assertTrue(flow.all { it > 0 })
assertFalse(flow.all { it % 2 == 0 })
assertFalse(flow.all { it > 5 })
}

@Test
fun testAllEmpty() = runTest {
assertTrue(emptyFlow<Int>().all { it > 0 })
}

@Test
fun testAllInfinite() = runTest {
assertFalse(flow { while (true) { emit(5) } }.all { it == 0 })
}

@Test
fun testAllShortCircuit() = runTest {
assertFalse(flow {
emit(1)
emit(2)
expectUnreached()
}.all {
it <= 1
})
}

@Test
fun testNoneNominal() = runTest {
val flow = flow {
emit(1)
emit(2)
}

assertFalse(flow.none { it > 0 })
assertFalse(flow.none { it % 2 == 0 })
assertTrue(flow.none { it > 5 })
}

@Test
fun testNoneEmpty() = runTest {
assertTrue(emptyFlow<Int>().none { it > 0 })
}

@Test
fun testNoneInfinite() = runTest {
assertFalse(flow { while (true) { emit(5) } }.none { it == 5 })
}

@Test
fun testNoneShortCircuit() = runTest {
assertFalse(flow {
emit(1)
emit(2)
expectUnreached()
}.none {
it == 2
})
}

}

0 comments on commit bed3d29

Please sign in to comment.