Skip to content

Commit d0dabb9

Browse files
authored
Ensure that flow operators propagate the cancellation exceptions (#4038)
Before this change, it could happen that some size-limiting operators upstream swallowed the requests to limit the flow size emitted by the operators downstream. This could cause `onCompletion` calls between these operators to incorrectly report that the flow was not in fact limited by the downstream operators. Additionally, in the presence of additional size-limiting operators in the chain, `first` and `single` and their variants could exhibit incorrect behavior where emitting a value from `onCompletion` would overwrite their output. Fixes #4035
1 parent 761bdeb commit d0dabb9

File tree

7 files changed

+103
-15
lines changed

7 files changed

+103
-15
lines changed

kotlinx-coroutines-core/common/src/flow/internal/Combine.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
102102
val collectJob = Job()
103103
(second as SendChannel<*>).invokeOnClose {
104104
// Optimization to avoid AFE allocation when the other flow is done
105-
if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
105+
if (collectJob.isActive) collectJob.cancel(AbortFlowException(collectJob))
106106
}
107107

108108
try {
@@ -124,14 +124,14 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus
124124
flow.collect { value ->
125125
withContextUndispatched(scopeContext, Unit, cnt) {
126126
val otherValue = second.receiveCatching().getOrElse {
127-
throw it ?:AbortFlowException(this@unsafeFlow)
127+
throw it ?: AbortFlowException(collectJob)
128128
}
129129
emit(transform(value, NULL.unbox(otherValue)))
130130
}
131131
}
132132
}
133133
} catch (e: AbortFlowException) {
134-
e.checkOwnership(owner = this@unsafeFlow)
134+
e.checkOwnership(owner = collectJob)
135135
} finally {
136136
second.cancel()
137137
}

kotlinx-coroutines-core/common/src/flow/internal/FlowExceptions.common.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@ import kotlinx.coroutines.*
44
import kotlinx.coroutines.flow.*
55

66
/**
7-
* This exception is thrown when operator need no more elements from the flow.
8-
* This exception should never escape outside of operator's implementation.
7+
* This exception is thrown when an operator needs no more elements from the flow.
8+
* The operator should never allow this exception to be thrown past its own boundary.
99
* This exception can be safely ignored by non-terminal flow operator if and only if it was caught by its owner
1010
* (see usages of [checkOwnership]).
11+
* Therefore, the [owner] parameter must be unique for every invocation of every operator.
1112
*/
12-
internal expect class AbortFlowException(owner: FlowCollector<*>) : CancellationException {
13-
public val owner: FlowCollector<*>
13+
internal expect class AbortFlowException(owner: Any) : CancellationException {
14+
val owner: Any
1415
}
1516

16-
internal fun AbortFlowException.checkOwnership(owner: FlowCollector<*>) {
17+
internal fun AbortFlowException.checkOwnership(owner: Any) {
1718
if (this.owner !== owner) throw this
1819
}
1920

kotlinx-coroutines-core/common/src/flow/operators/Limit.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T> = f
4646
public fun <T> Flow<T>.take(count: Int): Flow<T> {
4747
require(count > 0) { "Requested element count $count should be positive" }
4848
return flow {
49+
val ownershipMarker = Any()
4950
var consumed = 0
5051
try {
5152
collect { value ->
@@ -56,18 +57,18 @@ public fun <T> Flow<T>.take(count: Int): Flow<T> {
5657
if (++consumed < count) {
5758
return@collect emit(value)
5859
} else {
59-
return@collect emitAbort(value)
60+
return@collect emitAbort(value, ownershipMarker)
6061
}
6162
}
6263
} catch (e: AbortFlowException) {
63-
e.checkOwnership(owner = this)
64+
e.checkOwnership(owner = ownershipMarker)
6465
}
6566
}
6667
}
6768

68-
private suspend fun <T> FlowCollector<T>.emitAbort(value: T) {
69+
private suspend fun <T> FlowCollector<T>.emitAbort(value: T, ownershipMarker: Any) {
6970
emit(value)
70-
throw AbortFlowException(this)
71+
throw AbortFlowException(ownershipMarker)
7172
}
7273

7374
/**

kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,4 +308,90 @@ class OnCompletionTest : TestBase() {
308308
.take(1)
309309
.collect()
310310
}
311+
312+
/**
313+
* Tests that the operators that are used to limit the flow (like [take] and [zip]) faithfully propagate the
314+
* cancellation exception to the original owner.
315+
*/
316+
@Test
317+
fun testOnCompletionBetweenLimitingOperators() = runTest {
318+
// `zip` doesn't eat the exception thrown by `take`:
319+
flowOf(1, 2, 3)
320+
.zip(flowOf(4, 5)) { a, b -> a + b }
321+
.onCompletion {
322+
expect(2)
323+
assertNotNull(it)
324+
}
325+
.take(1)
326+
.collect {
327+
expect(1)
328+
}
329+
330+
// `take` doesn't eat the exception thrown by `zip`:
331+
flowOf(1, 2, 3)
332+
.take(2)
333+
.onCompletion {
334+
expect(4)
335+
assertNotNull(it)
336+
}
337+
.zip(flowOf(4)) { a, b -> a + b }
338+
.collect {
339+
expect(3)
340+
}
341+
342+
// `take` doesn't eat the exception thrown by `first`:
343+
flowOf(1, 2, 3)
344+
.take(2)
345+
.onCompletion {
346+
expect(5)
347+
assertNotNull(it)
348+
}
349+
.first()
350+
351+
// `zip` doesn't eat the exception thrown by `first`:
352+
flowOf(1, 2, 3)
353+
.zip(flowOf(4, 5)) { a, b -> a + b }
354+
.onCompletion {
355+
expect(6)
356+
assertNotNull(it)
357+
}
358+
.first()
359+
360+
// `take` doesn't eat the exception thrown by another `take`:
361+
flowOf(1, 2, 3)
362+
.take(2)
363+
.onCompletion {
364+
expect(8)
365+
assertNotNull(it)
366+
}
367+
.take(1)
368+
.collect {
369+
expect(7)
370+
}
371+
372+
// `zip` doesn't eat the exception thrown by another `zip`:
373+
flowOf(1, 2, 3)
374+
.zip(flowOf(4, 5)) { a, b -> a + b }
375+
.onCompletion {
376+
expect(10)
377+
assertNotNull(it)
378+
}
379+
.zip(flowOf(6)) { a, b -> a + b }
380+
.collect {
381+
expect(9)
382+
}
383+
384+
finish(11)
385+
}
386+
387+
/**
388+
* Tests that emitting new elements after completion doesn't overwrite the old elements.
389+
*/
390+
@Test
391+
fun testEmittingElementsAfterCancellation() = runTest {
392+
assertEquals(1, flowOf(1, 2, 3)
393+
.take(100)
394+
.onCompletion { emit(4) }
395+
.first())
396+
}
311397
}

kotlinx-coroutines-core/jsAndWasmShared/src/flow/internal/FlowExceptions.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ import kotlinx.coroutines.*
44
import kotlinx.coroutines.flow.*
55

66
internal actual class AbortFlowException actual constructor(
7-
actual val owner: FlowCollector<*>
7+
actual val owner: Any
88
) : CancellationException("Flow was aborted, no more elements needed")
99
internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")

kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import kotlinx.coroutines.*
44
import kotlinx.coroutines.flow.*
55

66
internal actual class AbortFlowException actual constructor(
7-
@JvmField @Transient actual val owner: FlowCollector<*>
7+
@JvmField @Transient actual val owner: Any
88
) : CancellationException("Flow was aborted, no more elements needed") {
99

1010
override fun fillInStackTrace(): Throwable {

kotlinx-coroutines-core/native/src/flow/internal/FlowExceptions.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import kotlinx.coroutines.*
44
import kotlinx.coroutines.flow.*
55

66
internal actual class AbortFlowException actual constructor(
7-
actual val owner: FlowCollector<*>
7+
actual val owner: Any
88
) : CancellationException("Flow was aborted, no more elements needed")
99
internal actual class ChildCancelledException : CancellationException("Child of the scoped flow was cancelled")
1010

0 commit comments

Comments
 (0)