-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce broadcast() extension to turn ReceiveChannel into BroadcastChannel #280
Comments
Should the |
We are talking about hot channels here, so it makes sense to start sending them immediately. It would be an interesting design question for |
I agree, with hot channel there is no point for not starting immediately. But for cold channel, I think it is more arguable. Even if I would be ok to start immediately, I think some user could find useful to at least decide how it behave depending on the use-case. With Rx for example the user may use However, may I ask what you would think about the following implementation? fun <E> ReceiveChannel<E>.broadcast(capacity: Int): BroadcastChannel<E> {
val broadcast = BroadcastChannel<E>(capacity)
val job = launch(Unconfined) {
try {
consumeEach { broadcast.send(it) }
} finally {
broadcast.close()
}
}
return object : BroadcastChannel<E> by broadcast {
override fun close(cause: Throwable?): Boolean {
job.cancel(cause)
return broadcast.close(cause)
}
}
} |
@jcornaz you should propagate close cause to It also seems better to also implement a And it will good for now. For a future, though, note that all current builders (like |
I'll do it myself. |
@sdeleuze Can you, please, elaborate a bit on your use-cases. I'm slightly worried that existing implementations of broadcast channel may not cover your needs, so we may need to add some new ones. In particular, I cannot figure out what kind of What kind of behavior is desired? When should happen to the produced items before the first invocation of |
Couldn't it be a capacity argument, capable of accepting |
The use case is to multicast elements from a hot stream to multiple clients, for example we can imagine a Coroutine based HTTP client requesting a remote JSON streaming endpoint on server-side and broadcasting to 100+ SSE endpoints, that's what we do with |
@sdeleuze Thanks. That is the same conclusion I came to -- mimic The tradeoff in this behavior, is that the producer of the stream will be suspended (backpressure) until the first subscriber appears. I really hope we are choosing the right default. Maybe we should have no default at all, but always require an explicit specification of what to do. |
Why not just have |
support `BroadcastChannel.cancel` method to drop the buffer. Fixes #280
@dave08 We still have to figure what Since we are talking about hot streams here, it does matter what we do. It is not so much of a concern for cold, Rx-style streams. |
Let me clarify why I think "lazy start, suspend producer" is a good default. It is a good default because it is consistent with all the other channel operations we have. If you write:
then the |
I see your point, I just felt that the learning curve of the current implementation is bigger since the concepts are mostly new ones, with little correlation with existing popular frameworks. It shouldn't have to be all the same, just similar enough, whenever possible, so as not to not have to relearn everything. We're still switching from rx because of all the advantages of working with coroutines, but I think that the general concepts and the way they work and adapting them to coroutines as much as it makes sense, would make things much easier to learn and to build on, and would use battle tested patterns that currently deal with a wider range of use cases |
Correct me if I'm wrong, but it is my understanding that coroutine channels themselves are not attempting to comply with the reactive-streams initiative but do provide a toolkit for which a reactive-stream implementation would greatly benefit. Although many of the concepts are somewhat aligned, the implementations aren't attempting to mirror each other and an abstraction on top of channels would be the best place to offer reactive-stream compliant functionality for ease of use by devs already familiar with Rx, etc. |
@caleb-allen Channels are quite an orthogonal concept to reactive streams. Channels come from CSP (Communicating Sequential Processes) world. In that worlds channels are things that you are supposed to share between communicating processes. Channels are like queues -- send at one and and receive on the other end. They are very much unlike reactive streams in many respects. However, when we start expanding the repertoire of channels that we support (like broadcast channels discussed here) we inevitably somewhat encroach onto the territory of reactive streams which sometimes results in confusion. Channels are not something that reactive streams might readily benefit from, but Kotlin coroutines and suspending functions in particular is something to watch for. See, reactive streams specification is quite involved precisely because it is designed to work around the lack of suspending functions in Java. Reactive streams is a clever hack to support asynchronous data streams in a language that does not support asynchrony natively. That is why Kotlin's only interest in reactive streams is from interop standpoint. We have all sorts of adapters with reactive streams, but there is no much benefit in using them in Kotlin as is. |
Ah, that makes sense! Still wrapping my head around how it all fits (or does not fit) together. So if I'm understanding this correctly, what reactive streams attempts to achieve (asyncronous streams following the reactive pattern) for Java is something that is inherent in Kotlin's design because of coroutines? And that's why it may be useless to try to implement reactive-streams for kotlin, because it would be be bypassing a core language feature. Do I have that right? |
@caleb-allen For asynchronous functions returning a single result we have a robust language feature (suspending functions). For asynchronous streams of values we still need something in the library. Given that asynchronous functions are natively supported in the language, this library's design will be necessary different form reactive streams (see #254). |
support `BroadcastChannel.cancel` method to drop the buffer. Fixes #280
support `BroadcastChannel.cancel` method to drop the buffer. Fixes #280
support `BroadcastChannel.cancel` method to drop the buffer; Introduce ReceiveChannel.broadcast() extension. Fixes #280
support `BroadcastChannel.cancel` method to drop the buffer; Introduce ReceiveChannel.broadcast() extension. Fixes #280
support `BroadcastChannel.cancel` method to drop the buffer; Introduce ReceiveChannel.broadcast() extension. Fixes #280
support `BroadcastChannel.cancel` method to drop the buffer; Introduce ReceiveChannel.broadcast() extension. Fixes #280
support `BroadcastChannel.cancel` method to drop the buffer; Introduce ReceiveChannel.broadcast() extension. Fixes #280
I want |
We need a
broadcast
operator that turns aReceiveChannel
intoBroadcastChannel
.The text was updated successfully, but these errors were encountered: