Skip to content

Commit

Permalink
Flow.transformWhile operator
Browse files Browse the repository at this point in the history
Also, all flow-truncating operators are refactored via a common internal collectWhile operator that properly uses AbortFlowException and checks for its ownership, so that we don't have to look for bugs in interactions between all those operators (and zip, too, which is also flow-truncating).

Fixes #2065
  • Loading branch information
elizarov committed May 27, 2020
1 parent adbbbaa commit f03e153
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 49 deletions.
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun transformLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun transformWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
Expand Down
10 changes: 6 additions & 4 deletions kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.flow as safeFlow

// ------------------ WARNING ------------------
// These emitting operators must use safe flow builder, because they allow
Expand All @@ -19,10 +20,11 @@ import kotlin.jvm.*
/**
* Applies [transform] function to each value of the given flow.
*
* The receiver of the [transform] is [FlowCollector] and thus `transform` is a
* generic function that may transform emitted element, skip it or emit it multiple times.
* The receiver of the `transform` is [FlowCollector] and thus `transform` is a
* flexible function that may transform emitted element, skip it or emit it multiple times.
*
* This operator can be used as a building block for other operators, for example:
* This operator generalizes [filter] and [map] operators and
* can be used as a building block for other operators, for example:
*
* ```
* fun Flow<Int>.skipOddAndDuplicateEven(): Flow<Int> = transform { value ->
Expand All @@ -35,7 +37,7 @@ import kotlin.jvm.*
*/
public inline fun <T, R> Flow<T>.transform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
): Flow<R> = safeFlow { // Note: safe flow is used here, because collector is exposed to transform on each operation
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
Expand Down
77 changes: 57 additions & 20 deletions kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.flow as safeFlow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
Expand Down Expand Up @@ -49,35 +51,70 @@ public fun <T> Flow<T>.take(count: Int): Flow<T> {
require(count > 0) { "Requested element count $count should be positive" }
return flow {
var consumed = 0
try {
collect { value ->
if (++consumed < count) {
return@collect emit(value)
} else {
return@collect emitAbort(value)
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this)
collectWhile { value ->
emit(value)
++consumed < count
}
}
}

private suspend fun <T> FlowCollector<T>.emitAbort(value: T) {
emit(value)
throw AbortFlowException(this)
}

/**
* Returns a flow that contains first elements satisfying the given [predicate].
*
* Note, that the resulting flow does not contain the element on which the [predicate] returned `true`.
* See [transformWhile] for a more flexible operator.
*/
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
try {
collect { value ->
if (predicate(value)) emit(value)
else throw AbortFlowException(this)
collectWhile { value ->
if (predicate(value)) {
emit(value)
true
} else {
false
}
}
}

/**
* Applies [transform] function to each value of the given flow while this
* function returns `true`.
*
* The receiver of the `transformWhile` is [FlowCollector] and thus `transformWhile` is a
* flexible function that may transform emitted element, skip it or emit it multiple times.
*
* This operator generalizes [takeWhile] and can be used as a building block for other operators.
* For example, a flow of download progress messages can be completed when the
* download is done but emit this last message (unlike `takeWhile`):
*
* ```
* fun Flow<DownloadProgress>.completeWhenDone(): Flow<DownloadProgress> =
* transformWhile { progress ->
* emit(progress) // always emit progress
* !progress.isDone() // continue while download is not done
* }
* }
* ```
*/
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.transformWhile(
@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Boolean
): Flow<R> =
safeFlow { // Note: safe flow is used here, because collector is exposed to transform on each operation
collectWhile { value ->
transform(value)
}
}

// Internal building block for all flow-truncating operators
internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) {
val collector = object : FlowCollector<T> {
override suspend fun emit(value: T) {
if (!predicate(value)) throw AbortFlowException(this)
}
}
try {
collect(collector)
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this)
e.checkOwnership(collector)
}
}
35 changes: 10 additions & 25 deletions kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {
*/
public suspend fun <T> Flow<T>.first(): T {
var result: Any? = NULL
collectUntil {
collectWhile {
result = it
true
false
}
if (result === NULL) throw NoSuchElementException("Expected at least one element")
return result as T
Expand All @@ -96,12 +96,12 @@ public suspend fun <T> Flow<T>.first(): T {
*/
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
var result: Any? = NULL
collectUntil {
collectWhile {
if (predicate(it)) {
result = it
true
} else {
false
} else {
true
}
}
if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
Expand All @@ -114,9 +114,9 @@ public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
*/
public suspend fun <T : Any> Flow<T>.firstOrNull(): T? {
var result: T? = null
collectUntil {
collectWhile {
result = it
true
false
}
return result
}
Expand All @@ -127,28 +127,13 @@ public suspend fun <T : Any> Flow<T>.firstOrNull(): T? {
*/
public suspend fun <T : Any> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
var result: T? = null
collectUntil {
collectWhile {
if (predicate(it)) {
result = it
true
} else {
false
} else {
true
}
}
return result
}

internal suspend inline fun <T> Flow<T>.collectUntil(crossinline block: suspend (value: T) -> Boolean) {
val collector = object : FlowCollector<T> {
override suspend fun emit(value: T) {
if (block(value)) {
throw AbortFlowException(this)
}
}
}
try {
collect(collector)
} catch (e: AbortFlowException) {
e.checkOwnership(collector)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.test.*

class TransformWhileTest : TestBase() {
@Test
fun testSimple() = runTest {
val flow = (0..10).asFlow()
val expected = listOf("A", "B", "C", "D")
val actual = flow.transformWhile { value ->
when(value) {
0 -> { emit("A"); true }
1 -> true
2 -> { emit("B"); emit("C"); true }
3 -> { emit("D"); false }
else -> { expectUnreached(); false }
}
}.toList()
assertEquals(expected, actual)
}

@Test
fun testExample() = runTest {
val source = listOf(
DownloadProgress(0),
DownloadProgress(50),
DownloadProgress(100),
DownloadProgress(147)
)
val expected = source.subList(0, 3)
val actual = source.asFlow().completeWhenDone().toList()
assertEquals(expected, actual)
}

private fun Flow<DownloadProgress>.completeWhenDone(): Flow<DownloadProgress> =
transformWhile { progress ->
emit(progress) // always emit progress
!progress.isDone() // continue while download is not done
}

private data class DownloadProgress(val percent: Int) {
fun isDone() = percent >= 100
}
}

0 comments on commit f03e153

Please sign in to comment.