-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
consume: the unreliable way to cancel upstream sources for channel operators #279
Comments
In my opinion solution 2 is better. Especially because it makes custom operator writing even more easier and less error prone. But I would personally keep |
Is it possible to address this issue using the fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
produce(context, parent = job) {
for (element in this@filter) {
if (predicate(element)) send(element)
}
}.also { produce ->
produce.job.invokeOnCompletion {
this@filter.job.cancel(it)
}
} or something similar... |
@fvasco Solution via an explicit
So, the solution with |
@elizarov If Personally I don't like the syntax task taskX(dependsOn: ':projectB:taskY') |
…osing of the source channels under all circumstances; `onCompletion` added to `produce` builder; `ReceiveChannel.cancel: CompletionHandler` extension val; Fixes #279
By the way, I'd like to take the opportunity to ask: Why is there a I think it is redundant, because with any Or am I missing something? |
@fvasco I'm trying to make more declarative by providing out-of-the box completion handlers. In the current draft version I'm working on implementation of
I'm also open to alternative approaches, like What I like about |
So, I've further played with different names and so far I like best the following:
For multiple source channels operations (like
The reason for this particular naming is that in the future we'd like to to fail-fast when multiple consumers are trying to work with the same channel (see #167), so |
Hope this new onCompletion parameter (kind of Callback function) will not lead to "Async Callback style" use of Coroutines, using onCompletion lambda for next statements |
of the source channels under all circumstances; `onCompletion` added to `produce` builder; `ReceiveChannel.consumes(): CompletionHandler` extension fun. Fixes Kotlin#279
Background
Currently, all built-in operations on
ReceiveChannel
are implemented using basically the same code pattern. Take a look at the implementation offilter
, for example:Under the hood,
consumeEach
usesconsume
to make sure that every item from the upstream channel is consumed even when thefilter
coroutine crashes or is cancelled:where definition of
consume
is likewise simple:This design ensures that when you have a chain of operators applied to some source channel like in
val res = src.filter { ... }
, then closing the resultingres
channel forwards that cancellation upstream and closes the sourcesrc
channel, too, thus making sure that any resources used by the source are promptly released.Problem
It all works great until we have an unhappy combination of factors. Consider the following code:
Run the code and see that it behaves as expected, printing:
Now, replace the definition of
res
channel with new one, addingcoroutineContext
parameter to thefilter
invocation, so that filtering coroutine runs in the context ofrunBlocking
dispatcher:Running this code again produces:
What is going on here? The problem is in the combination of three factors:
produce(context) { consume { ... } }
, ourrunBlocking
dispatcher, and the fact that the resulting channel is immediately cancelled.When
produce
is invoked it does not immediately start running the coroutine, but schedules its execution using the specifiedcontext
. In the above code we've passedrunBlocking
context, so it gets scheduled to the main thread for execution instead of being executed right way. Now if we cancel theproduce
coroutine while it is being scheduled for execution, then it does not run at all, so it does not executeconsume
and does not cancel the source channel.It works without problems when we don't specify the context explicitly, because, by default,
Unconfined
context is used, which starts executing its code immediate until the next suspension point.Solution 1: CoroutineStart.ATOMIC
We can use
produce(context, start = CoroutineStart.ATOMIC) { ... }
in implementation of all the operators. It makes sure that coroutine is not cancellable until it starts to execute. This change provides an immediate relief for this problem, but it has a slight unintended side-effect that this "non-cancellable" period extends until the first suspension in the coroutine, while might be too long. It also feels extremely fragile. It is quite easy to forget thatstart
parameter.Solution 2: Abolish
consume
, provideonCompletion
We can add additional optional parameter
onCompletion: (Throwable?) -> Unit
toproduce
and other similar coroutine builders and completely change the implementation pattern forfilter
and other operations like this:The advantage of this pattern is that it scales better to operators on multiple channels and makes them less error-prone to write. For example, the current implementation of
zip
looks like this:It is quite nested and magic for its reliance on
consume
andconsumeEach
to close the source channels. WithonCompletion
we can write instead:Here the fact that the source channels are closed on completion is much more explicit in the code instead of being hidden, and multiple source channel do not produce additional nesting of
consume
blocks.However, that leaves an open question on what to do with
consume
andconsumeEach
. One approach is to deprecateconsume
for its error-proneness. However, we should leavefinally { close() }
behavior inconsumeEach
, becauseconsumeEach
is a terminal operator that should always make sure that the original channel is fully consumed.Discussion
Making sure that upstream channels are properly closed is a bane and inherent complexity of "hot" channels, especially when they are backed by actual resources, like network streams. It is not a problem with "cold" channels (see #254) which, by definition, do not allocate any resource until subscribed to. However, just switching to cold channels everywhere does not provide any relief if the domain problem at hand actually calls for a hot channel. Cold channels that are backed by a hot channels suffer from all the same problem that their underlying hot channels have to be somehow closed when they are no longer needed.
The text was updated successfully, but these errors were encountered: