Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: channel::send_all? #486

Open
yoshuawuyts opened this issue Nov 9, 2019 · 6 comments
Open

API: channel::send_all? #486

yoshuawuyts opened this issue Nov 9, 2019 · 6 comments
Labels
api design Open design questions

Comments

@yoshuawuyts
Copy link
Contributor

yoshuawuyts commented Nov 9, 2019

In #212 (comment) I mentioned it'd be interesting to investigate broadcasting channels. I initially thought we might want to create a new type for this, possibly in a separate library, but I think at least API-wise we could integrate this into the existing channels module.

The way we could do this is by introducing a new API: Sender::send_all. When an item is passed into Sender::send_all it's cloned and sent to every available Receiver.

Now a huge caveat here is that this may not at all be performant, and/or complicate the channels design to such a degree that a separate type is warranted. But I figured at least sharing what seems to be an ideal solution from an end-user point of view would provide us with a starting point to assess this further.

Related #436, cc/ @stjepang

API

impl<T: Clone> Sender<T> {
    pub async fn send_all<'_>(&'_ self, msg: T);
}

Example

use async_std::channel;

let (s, r) = sync::channel(1);

let a = task::spawn(async { dbg!(r.recv().await) });
let b = task::spawn(async { dbg!(r.recv().await) });
let c = task::spawn(async { dbg!(r.recv().await) });

s.send_all(String::from("chashu")).await;

a.join(b).join(c).await;
// prints: "chashu", "chashu", "chashu"
@yoshuawuyts yoshuawuyts added the api design Open design questions label Nov 9, 2019
@dignifiedquire
Copy link
Member

I think it would be surprising to have broadcast and unicast channels be the same Receiver and Sender types. I would much rather be able to distinguish between the two based on those types.

@dignifiedquire
Copy link
Member

For cases where I don't care about the source, I can always use T: Stream<Message> instead of Recevier<Message>.

@yoshuawuyts
Copy link
Contributor Author

@dignifiedquire Can you share more why you'd want to distinguish between the two?

To share some more of my thinking here: MPMC channels (like the ones in this crate) already support having multiple senders and multiple receivers. The proposal here would be to add a single method so that a message can be sent to all active receivers (rather than only a single one at the time). From an API perspective this feels like a relatively natural extension.


We can actually look at the broader ecosystem to see how alternative models would play out. Tokio recently introduced tokio::sync::broadcast. They started off in 0.1 by providing SPSC and MPSC channels (oneshot and mpsc channels respectively). But they probably noticed people had requirements for other topologies as well. Currently they provide:

  • tokio::sync::mpsc - multi-producer, single-consumer.
  • tokio::sync::broadcast - multi-producer, multi-consumer. broadcast only.
  • tokio::sync::oneshot - single-producer, single-consumer. Can send max 1 msg.
  • tokio::sync::watcher - single-producer, multi-consumer. broadcast only.

Because async-std's channels are already multi-producer, multi-consumer we're already positioned to cover all these topologies (and more). Upstream consumers can use async_std's channels as the core building block to create specialized versions using newtypes. (e.g. enforce single-use by consuming self, or single-producer, multi-consumer, etc.).

Because the way async-std's channels have been designed we can already fast-path simpler topologies, so there's not even a performance argument to add different channel kinds. All we're missing is a way to send a message to all active receivers, which is what this issue proposes we add.

@austinabell
Copy link

austinabell commented Sep 26, 2020

Isn't there enough differences and restrictions to group in the broadcast ability with the async-std channel?

It would require Clone for the generic type T as well as either storing a secondary data structure to store the sends to all channels or block the sender/other receivers on reading the event? Even if you can avoid blocking the other senders when one does not read the event, how do you plan on handling when the channel fills?

For example the case:

use async_std::sync::channel;

let (s, r1) = channel(1);

// Clone the channel and send an event to both channels
let r2 = r1.clone();
s.send_all(String::from("chashu")).await;

assert_eq!(r1.recv().await, "chashu");

// This will block since r2 channel is full and has not polled "chashu" even though r1 could receive this event
s.send(String::from("test")).await;

The tokio broadcast channel errors only on the receiver that is lagging, but also allows it to poll most recent events after polling the error. Is it intended that this would block the sender on a full channel buffer (blocked by the slowest receiver) or this behaviour?

@yoshuawuyts
Copy link
Contributor Author

yoshuawuyts commented Sep 26, 2020

It would require Clone for the generic type T as well as either storing a secondary data structure to store the sends to all channels or block the sender/other receivers on reading the event?

Clone would only be required for messages that are broadcast, not for the other methods. And adding broadcast functionality does not require an extra data structure; we know for a fact that broadcast / try_broadcast can share a majority of its implementation with send / try_send.

Is it intended that this would block the sender on a full channel buffer (blocked by the slowest receiver) or this behaviour?

Absolutely; if you don't want to be blocked on the slowest reader, either using unbounded semantics or a large queue size should provide enough flexibility. Though unbounded doesn't exist in async-std today, it does in async-channel which is the impl we want to adopt for async-std's channel stabilization. Also if slow reads are a problem, attaching a per-receiver timeout is an option available to end-users as well.

@austinabell
Copy link

Clone would only be required for messages that are broadcast, not for the other methods.

I'm having a little trouble thinking about how that could be used, because then the receive function would T to be bound by Clone as well? I'm also operating under the assumption that the channel buffer is shared for receivers, and is cloned off on receive, is this an unintended premise?

Absolutely; if you don't want to be blocked on the slowest reader, either using unbounded semantics or a large queue size should provide enough flexibility. Though unbounded doesn't exist in async-std today, it does in async-channel which is the impl we want to adopt for async-std's channel stabilization. Also if slow reads are a problem, attaching a per-receiver timeout is an option available to end-users as well.

Hmm yeah my intuition would be that unbounded receivers doesn't really seem that practical for this, and timeouts I would think would add unnecessary overhead. I am also biased by a specific use case and don't have extensive understanding of how the internals of this channel works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api design Open design questions
Projects
None yet
Development

No branches or pull requests

3 participants