Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce broadcast() extension to turn ReceiveChannel into BroadcastChannel #280

Closed
elizarov opened this issue Mar 12, 2018 · 18 comments
Closed
Assignees

Comments

@elizarov
Copy link
Contributor

We need a broadcast operator that turns a ReceiveChannel into BroadcastChannel.

@jcornaz
Copy link
Contributor

jcornaz commented Mar 12, 2018

Should the broadcast operator directly start to send elements? Or should it wait for the call on a method, like connect of the Rx publish operator?

@elizarov
Copy link
Contributor Author

We are talking about hot channels here, so it makes sense to start sending them immediately. It would be an interesting design question for broadcast() (and for produce()) operators on cold streams (see #254), but I would also argue that for cold channels, too, the very invocation of broadcast() is a signal that you want to "meterialize" the source (start running it).

@jcornaz
Copy link
Contributor

jcornaz commented Mar 12, 2018

I agree, with hot channel there is no point for not starting immediately.

But for cold channel, I think it is more arguable. Even if I would be ok to start immediately, I think some user could find useful to at least decide how it behave depending on the use-case. With Rx for example the user may use connect, autoConnect, refCount and share to manage it.

However, may I ask what you would think about the following implementation?

fun <E> ReceiveChannel<E>.broadcast(capacity: Int): BroadcastChannel<E> {
  val broadcast = BroadcastChannel<E>(capacity)

  val job = launch(Unconfined) {
    try {
      consumeEach { broadcast.send(it) }
    } finally {
      broadcast.close()
    }
  }

  return object : BroadcastChannel<E> by broadcast {
    override fun close(cause: Throwable?): Boolean {
      job.cancel(cause)
      return broadcast.close(cause)
    }
  }
}

@elizarov
Copy link
Contributor Author

elizarov commented Mar 12, 2018

@jcornaz you should propagate close cause to broadcast.close, that is add catch(e: Throwable) before finally to remember the cause.

It also seems better to also implement a broadcast { ... } builder for general use that performs this binding of broadcast channel to a job, and then implement .broadcast() operator via that broadcast { ... } builder in a straightforward way.

And it will good for now.

For a future, though, note that all current builders (like produce and actor) return a new object that delegates all the channel methods to the underlying channel (like by broadcast in your implementation). This is a suboptimal solution. Actually, if we introduce a linked job in every channel implementation (see discussion in #260), then channels returned by produce, actor, and broadcast will be more efficient as they will not require any delegation (because every channel will natively support a linked job).

@elizarov
Copy link
Contributor Author

I'll do it myself.

@elizarov elizarov self-assigned this Mar 13, 2018
@elizarov
Copy link
Contributor Author

elizarov commented Mar 14, 2018

@sdeleuze Can you, please, elaborate a bit on your use-cases. I'm slightly worried that existing implementations of broadcast channel may not cover your needs, so we may need to add some new ones. In particular, I cannot figure out what kind of BroadcastChannel we shall create in broadcast() by default.

What kind of behavior is desired? When should happen to the produced items before the first invocation of openSubscription? Shall they be dropped or buffered? If buffered, then up to what limit? If they are buffered, then what should happen after the first openSubscription? Shall they be dropped or buffered (basically forever) for all the subsequent subscribers?

@jcornaz
Copy link
Contributor

jcornaz commented Mar 14, 2018

I cannot figure out what kind of BroadcastChannel we shall create in broadcast() by default

Couldn't it be a capacity argument, capable of accepting Channel.CONFLATED for example?

@sdeleuze
Copy link
Contributor

The use case is to multicast elements from a hot stream to multiple clients, for example we can imagine a Coroutine based HTTP client requesting a remote JSON streaming endpoint on server-side and broadcasting to 100+ SSE endpoints, that's what we do with Flux + share(). No need to buffer, items could be lost for late subscribers.

@elizarov
Copy link
Contributor Author

@sdeleuze Thanks. That is the same conclusion I came to -- mimic share by default. So, by default broadcast() is going to use CoroutineStart.LAZY and start only when the first subscriber comes in. That guarantees that first subscriber never looses elements. Late subscribers may loose them, though.

The tradeoff in this behavior, is that the producer of the stream will be suspended (backpressure) until the first subscriber appears. I really hope we are choosing the right default. Maybe we should have no default at all, but always require an explicit specification of what to do.

@dave08
Copy link

dave08 commented Mar 14, 2018

Why not just have .share() insead of .broadcast() for that..., it's not top level, and it mimicks existing rx behavior, so there will be less confusion... Then the top level broadcast would be like a ConnectableObservable, where connect == openSubscription... it's a big advantage to have people able to reuse knowledge from other frameworks rather than just having to learn another one and those used to existing ones could get confused with conflicting behaviors of similarly working components. It also eases documenting to have existing comparisons...

elizarov added a commit that referenced this issue Mar 14, 2018
support `BroadcastChannel.cancel` method to drop the buffer.

Fixes #280
@elizarov
Copy link
Contributor Author

elizarov commented Mar 14, 2018

@dave08 We still have to figure what share and/or broadcast does by default regardless of how we name it. We can have different methods with different behaviour, but it does not make it any simpler. How would I remember which one does what?

Since we are talking about hot streams here, it does matter what we do. It is not so much of a concern for cold, Rx-style streams.

@elizarov
Copy link
Contributor Author

elizarov commented Mar 14, 2018

Let me clarify why I think "lazy start, suspend producer" is a good default. It is a good default because it is consistent with all the other channel operations we have. If you write:

val dest = source.filter { blah-blah }

then the source is generally suspended until you start consuming dest (because of backpressure propagation). It seems logical to have the same behavior (by default) with broadcast.

@dave08
Copy link

dave08 commented Mar 14, 2018

I see your point, I just felt that the learning curve of the current implementation is bigger since the concepts are mostly new ones, with little correlation with existing popular frameworks.

It shouldn't have to be all the same, just similar enough, whenever possible, so as not to not have to relearn everything.

We're still switching from rx because of all the advantages of working with coroutines, but I think that the general concepts and the way they work and adapting them to coroutines as much as it makes sense, would make things much easier to learn and to build on, and would use battle tested patterns that currently deal with a wider range of use cases

@caleb-allen
Copy link

Correct me if I'm wrong, but it is my understanding that coroutine channels themselves are not attempting to comply with the reactive-streams initiative but do provide a toolkit for which a reactive-stream implementation would greatly benefit. Although many of the concepts are somewhat aligned, the implementations aren't attempting to mirror each other and an abstraction on top of channels would be the best place to offer reactive-stream compliant functionality for ease of use by devs already familiar with Rx, etc.

@elizarov
Copy link
Contributor Author

@caleb-allen Channels are quite an orthogonal concept to reactive streams. Channels come from CSP (Communicating Sequential Processes) world. In that worlds channels are things that you are supposed to share between communicating processes. Channels are like queues -- send at one and and receive on the other end. They are very much unlike reactive streams in many respects.

However, when we start expanding the repertoire of channels that we support (like broadcast channels discussed here) we inevitably somewhat encroach onto the territory of reactive streams which sometimes results in confusion.

Channels are not something that reactive streams might readily benefit from, but Kotlin coroutines and suspending functions in particular is something to watch for. See, reactive streams specification is quite involved precisely because it is designed to work around the lack of suspending functions in Java. Reactive streams is a clever hack to support asynchronous data streams in a language that does not support asynchrony natively. That is why Kotlin's only interest in reactive streams is from interop standpoint. We have all sorts of adapters with reactive streams, but there is no much benefit in using them in Kotlin as is.

@caleb-allen
Copy link

Ah, that makes sense! Still wrapping my head around how it all fits (or does not fit) together.

So if I'm understanding this correctly, what reactive streams attempts to achieve (asyncronous streams following the reactive pattern) for Java is something that is inherent in Kotlin's design because of coroutines? And that's why it may be useless to try to implement reactive-streams for kotlin, because it would be be bypassing a core language feature. Do I have that right?

@elizarov
Copy link
Contributor Author

@caleb-allen For asynchronous functions returning a single result we have a robust language feature (suspending functions). For asynchronous streams of values we still need something in the library. Given that asynchronous functions are natively supported in the language, this library's design will be necessary different form reactive streams (see #254).

elizarov added a commit that referenced this issue Apr 11, 2018
support `BroadcastChannel.cancel` method to drop the buffer.

Fixes #280
elizarov added a commit that referenced this issue Apr 11, 2018
support `BroadcastChannel.cancel` method to drop the buffer.

Fixes #280
elizarov added a commit that referenced this issue May 16, 2018
support `BroadcastChannel.cancel` method to drop the buffer;
Introduce ReceiveChannel.broadcast() extension.

Fixes #280
elizarov added a commit that referenced this issue May 16, 2018
support `BroadcastChannel.cancel` method to drop the buffer;
Introduce ReceiveChannel.broadcast() extension.

Fixes #280
elizarov added a commit that referenced this issue May 17, 2018
support `BroadcastChannel.cancel` method to drop the buffer;
Introduce ReceiveChannel.broadcast() extension.

Fixes #280
elizarov added a commit that referenced this issue May 18, 2018
support `BroadcastChannel.cancel` method to drop the buffer;
Introduce ReceiveChannel.broadcast() extension.

Fixes #280
elizarov added a commit that referenced this issue May 18, 2018
support `BroadcastChannel.cancel` method to drop the buffer;
Introduce ReceiveChannel.broadcast() extension.

Fixes #280
@ZakTaccardi
Copy link

I want broadcast() to have ConflatedBroadcastChannel- like behavior, where latest emission is replayed to new subscribers. Is this possible?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants