Skip to content

refactor!: rename spsc module to mpsc #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions examples/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
use anyhow::bail;
use futures_buffered::BufferedStreamExt;
use irpc::{
channel::{oneshot, spsc},
channel::{mpsc, oneshot},
rpc::{listen, Handler},
rpc_requests,
util::{make_client_endpoint, make_server_endpoint},
Expand Down Expand Up @@ -61,11 +61,11 @@ enum ComputeRequest {
enum ComputeProtocol {
#[rpc(tx=oneshot::Sender<u128>)]
Sqr(Sqr),
#[rpc(rx=spsc::Receiver<i64>, tx=oneshot::Sender<i64>)]
#[rpc(rx=mpsc::Receiver<i64>, tx=oneshot::Sender<i64>)]
Sum(Sum),
#[rpc(tx=spsc::Sender<u64>)]
#[rpc(tx=mpsc::Sender<u64>)]
Fibonacci(Fibonacci),
#[rpc(rx=spsc::Receiver<u64>, tx=spsc::Sender<u64>)]
#[rpc(rx=mpsc::Receiver<u64>, tx=mpsc::Sender<u64>)]
Multiply(Multiply),
}

Expand Down Expand Up @@ -200,11 +200,11 @@ impl ComputeApi {
}
}

pub async fn sum(&self) -> anyhow::Result<(spsc::Sender<i64>, oneshot::Receiver<i64>)> {
pub async fn sum(&self) -> anyhow::Result<(mpsc::Sender<i64>, oneshot::Receiver<i64>)> {
let msg = Sum;
match self.inner.request().await? {
Request::Local(request) => {
let (num_tx, num_rx) = spsc::channel(10);
let (num_tx, num_rx) = mpsc::channel(10);
let (sum_tx, sum_rx) = oneshot::channel();
request.send((msg, sum_tx, num_rx)).await?;
Ok((num_tx, sum_rx))
Expand All @@ -216,11 +216,11 @@ impl ComputeApi {
}
}

pub async fn fibonacci(&self, max: u64) -> anyhow::Result<spsc::Receiver<u64>> {
pub async fn fibonacci(&self, max: u64) -> anyhow::Result<mpsc::Receiver<u64>> {
let msg = Fibonacci { max };
match self.inner.request().await? {
Request::Local(request) => {
let (tx, rx) = spsc::channel(128);
let (tx, rx) = mpsc::channel(128);
request.send((msg, tx)).await?;
Ok(rx)
}
Expand All @@ -234,12 +234,12 @@ impl ComputeApi {
pub async fn multiply(
&self,
initial: u64,
) -> anyhow::Result<(spsc::Sender<u64>, spsc::Receiver<u64>)> {
) -> anyhow::Result<(mpsc::Sender<u64>, mpsc::Receiver<u64>)> {
let msg = Multiply { initial };
match self.inner.request().await? {
Request::Local(request) => {
let (in_tx, in_rx) = spsc::channel(128);
let (out_tx, out_rx) = spsc::channel(128);
let (in_tx, in_rx) = mpsc::channel(128);
let (out_tx, out_rx) = mpsc::channel(128);
request.send((msg, out_tx, in_rx)).await?;
Ok((in_tx, out_rx))
}
Expand Down
10 changes: 5 additions & 5 deletions examples/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use anyhow::{Context, Result};
use irpc::{
channel::{oneshot, spsc},
channel::{mpsc, oneshot},
rpc::Handler,
rpc_requests,
util::{make_client_endpoint, make_server_endpoint},
Expand Down Expand Up @@ -55,9 +55,9 @@ enum StorageProtocol {
Get(Get),
#[rpc(tx=oneshot::Sender<()>)]
Set(Set),
#[rpc(tx=oneshot::Sender<u64>, rx=spsc::Receiver<(String, String)>)]
#[rpc(tx=oneshot::Sender<u64>, rx=mpsc::Receiver<(String, String)>)]
SetMany(SetMany),
#[rpc(tx=spsc::Sender<String>)]
#[rpc(tx=mpsc::Sender<String>)]
List(List),
}

Expand Down Expand Up @@ -152,7 +152,7 @@ impl StorageApi {
self.inner.rpc(Get { key }).await
}

pub async fn list(&self) -> irpc::Result<spsc::Receiver<String>> {
pub async fn list(&self) -> irpc::Result<mpsc::Receiver<String>> {
self.inner.server_streaming(List, 16).await
}

Expand All @@ -162,7 +162,7 @@ impl StorageApi {

pub async fn set_many(
&self,
) -> irpc::Result<(spsc::Sender<(String, String)>, oneshot::Receiver<u64>)> {
) -> irpc::Result<(mpsc::Sender<(String, String)>, oneshot::Receiver<u64>)> {
self.inner.client_streaming(SetMany, 4).await
}
}
Expand Down
8 changes: 4 additions & 4 deletions examples/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use anyhow::bail;
use irpc::{
channel::{none::NoReceiver, oneshot, spsc},
channel::{mpsc, none::NoReceiver, oneshot},
rpc::{listen, Handler},
util::{make_client_endpoint, make_server_endpoint},
Channels, Client, LocalSender, Request, Service, WithChannels,
Expand Down Expand Up @@ -36,7 +36,7 @@ struct List;

impl Channels<StorageService> for List {
type Rx = NoReceiver;
type Tx = spsc::Sender<String>;
type Tx = mpsc::Sender<String>;
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -157,11 +157,11 @@ impl StorageApi {
}
}

pub async fn list(&self) -> anyhow::Result<spsc::Receiver<String>> {
pub async fn list(&self) -> anyhow::Result<mpsc::Receiver<String>> {
let msg = List;
match self.inner.request().await? {
Request::Local(request) => {
let (tx, rx) = spsc::channel(10);
let (tx, rx) = mpsc::channel(10);
request.send((msg, tx)).await?;
Ok(rx)
}
Expand Down
8 changes: 4 additions & 4 deletions irpc-iroh/examples/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ mod storage {
Endpoint,
};
use irpc::{
channel::{oneshot, spsc},
channel::{mpsc, oneshot},
Client, Service, WithChannels,
};
// Import the macro
Expand Down Expand Up @@ -122,9 +122,9 @@ mod storage {
Get(Get),
#[rpc(tx=oneshot::Sender<()>)]
Set(Set),
#[rpc(tx=oneshot::Sender<u64>, rx=spsc::Receiver<(String, String)>)]
#[rpc(tx=oneshot::Sender<u64>, rx=mpsc::Receiver<(String, String)>)]
SetMany(SetMany),
#[rpc(tx=spsc::Sender<String>)]
#[rpc(tx=mpsc::Sender<String>)]
List(List),
}

Expand Down Expand Up @@ -265,7 +265,7 @@ mod storage {
self.inner.rpc(Get { key }).await
}

pub async fn list(&self) -> Result<spsc::Receiver<String>, irpc::Error> {
pub async fn list(&self) -> Result<mpsc::Receiver<String>, irpc::Error> {
self.inner.server_streaming(List, 10).await
}

Expand Down
6 changes: 3 additions & 3 deletions irpc-iroh/examples/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod storage {
use anyhow::{Context, Result};
use iroh::{protocol::ProtocolHandler, Endpoint};
use irpc::{
channel::{oneshot, spsc},
channel::{mpsc, oneshot},
rpc::Handler,
rpc_requests, Client, LocalSender, Service, WithChannels,
};
Expand Down Expand Up @@ -97,7 +97,7 @@ mod storage {
Get(Get),
#[rpc(tx=oneshot::Sender<()>)]
Set(Set),
#[rpc(tx=spsc::Sender<String>)]
#[rpc(tx=mpsc::Sender<String>)]
List(List),
}

Expand Down Expand Up @@ -190,7 +190,7 @@ mod storage {
self.inner.rpc(Get { key }).await
}

pub async fn list(&self) -> irpc::Result<spsc::Receiver<String>> {
pub async fn list(&self) -> irpc::Result<mpsc::Receiver<String>> {
self.inner.server_streaming(List, 10).await
}

Expand Down
57 changes: 28 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ pub trait Receiver: Debug + Sealed {}

/// Trait to specify channels for a message and service
pub trait Channels<S: Service> {
/// The sender type, can be either spsc, oneshot or none
/// The sender type, can be either mpsc, oneshot or none
type Tx: Sender;
/// The receiver type, can be either spsc, oneshot or none
/// The receiver type, can be either mpsc, oneshot or none
///
/// For many services, the receiver is not needed, so it can be set to [`NoReceiver`].
type Rx: Receiver;
Expand Down Expand Up @@ -315,14 +315,14 @@ pub mod channel {

/// SPSC channel, similar to tokio's mpsc channel
///
/// For the rpc case, the send side can not be cloned, hence spsc instead of mpsc.
pub mod spsc {
/// For the rpc case, the send side can not be cloned, hence mpsc instead of mpsc.
pub mod mpsc {
use std::{fmt::Debug, future::Future, io, pin::Pin, sync::Arc};

use super::{RecvError, SendError};
use crate::RpcMessage;

/// Create a local spsc sender and receiver pair, with the given buffer size.
/// Create a local mpsc sender and receiver pair, with the given buffer size.
///
/// This is currently using a tokio channel pair internally.
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
Expand Down Expand Up @@ -582,7 +582,7 @@ pub mod channel {
impl crate::Receiver for NoReceiver {}
}

/// Error when sending a oneshot or spsc message. For local communication,
/// Error when sending a oneshot or mpsc message. For local communication,
/// the only thing that can go wrong is that the receiver has been dropped.
///
/// For rpc communication, there can be any number of errors, so this is a
Expand All @@ -608,7 +608,7 @@ pub mod channel {
}
}

/// Error when receiving a oneshot or spsc message. For local communication,
/// Error when receiving a oneshot or mpsc message. For local communication,
/// the only thing that can go wrong is that the sender has been closed.
///
/// For rpc communication, there can be any number of errors, so this is a
Expand Down Expand Up @@ -871,24 +871,24 @@ impl<M, R, S> Client<M, R, S> {
}
}

/// Performs a request for which the server returns a spsc receiver.
/// Performs a request for which the server returns a mpsc receiver.
pub fn server_streaming<Req, Res>(
&self,
msg: Req,
local_response_cap: usize,
) -> impl Future<Output = Result<channel::spsc::Receiver<Res>>> + Send + 'static
) -> impl Future<Output = Result<channel::mpsc::Receiver<Res>>> + Send + 'static
where
S: Service,
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
R: From<Req> + Serialize + Send + Sync + 'static,
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = NoReceiver> + Send + 'static,
Req: Channels<S, Tx = channel::mpsc::Sender<Res>, Rx = NoReceiver> + Send + 'static,
Res: RpcMessage,
{
let request = self.request();
async move {
let recv: channel::spsc::Receiver<Res> = match request.await? {
let recv: channel::mpsc::Receiver<Res> = match request.await? {
Request::Local(request) => {
let (tx, rx) = channel::spsc::channel(local_response_cap);
let (tx, rx) = channel::mpsc::channel(local_response_cap);
request.send((msg, tx)).await?;
rx
}
Expand All @@ -911,26 +911,26 @@ impl<M, R, S> Client<M, R, S> {
local_update_cap: usize,
) -> impl Future<
Output = Result<(
channel::spsc::Sender<Update>,
channel::mpsc::Sender<Update>,
channel::oneshot::Receiver<Res>,
)>,
>
where
S: Service,
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
R: From<Req> + Serialize + 'static,
Req: Channels<S, Tx = channel::oneshot::Sender<Res>, Rx = channel::spsc::Receiver<Update>>,
Req: Channels<S, Tx = channel::oneshot::Sender<Res>, Rx = channel::mpsc::Receiver<Update>>,
Update: RpcMessage,
Res: RpcMessage,
{
let request = self.request();
async move {
let (update_tx, res_rx): (
channel::spsc::Sender<Update>,
channel::mpsc::Sender<Update>,
channel::oneshot::Receiver<Res>,
) = match request.await? {
Request::Local(request) => {
let (req_tx, req_rx) = channel::spsc::channel(local_update_cap);
let (req_tx, req_rx) = channel::mpsc::channel(local_update_cap);
let (res_tx, res_rx) = channel::oneshot::channel();
request.send((msg, res_tx, req_rx)).await?;
(req_tx, res_rx)
Expand All @@ -947,32 +947,32 @@ impl<M, R, S> Client<M, R, S> {
}
}

/// Performs a request for which the client can send updates, and the server returns a spsc receiver.
/// Performs a request for which the client can send updates, and the server returns a mpsc receiver.
pub fn bidi_streaming<Req, Update, Res>(
&self,
msg: Req,
local_update_cap: usize,
local_response_cap: usize,
) -> impl Future<Output = Result<(channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>)>>
) -> impl Future<Output = Result<(channel::mpsc::Sender<Update>, channel::mpsc::Receiver<Res>)>>
+ Send
+ 'static
where
S: Service,
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
R: From<Req> + Serialize + Send + 'static,
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = channel::spsc::Receiver<Update>>
Req: Channels<S, Tx = channel::mpsc::Sender<Res>, Rx = channel::mpsc::Receiver<Update>>
+ Send
+ 'static,
Update: RpcMessage,
Res: RpcMessage,
{
let request = self.request();
async move {
let (update_tx, res_rx): (channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>) =
let (update_tx, res_rx): (channel::mpsc::Sender<Update>, channel::mpsc::Receiver<Res>) =
match request.await? {
Request::Local(request) => {
let (update_tx, update_rx) = channel::spsc::channel(local_update_cap);
let (res_tx, res_rx) = channel::spsc::channel(local_response_cap);
let (update_tx, update_rx) = channel::mpsc::channel(local_update_cap);
let (res_tx, res_rx) = channel::mpsc::channel(local_response_cap);
request.send((msg, res_tx, update_rx)).await?;
(update_tx, res_rx)
}
Expand Down Expand Up @@ -1118,10 +1118,9 @@ pub mod rpc {

use crate::{
channel::{
mpsc::{self, DynReceiver, DynSender},
none::NoSender,
oneshot,
spsc::{self, DynReceiver, DynSender},
RecvError, SendError,
oneshot, RecvError, SendError,
},
util::{now_or_never, AsyncReadVarintExt, WriteVarintExt},
RequestError, RpcMessage,
Expand Down Expand Up @@ -1290,9 +1289,9 @@ pub mod rpc {
}
}

impl<T: RpcMessage> From<quinn::RecvStream> for spsc::Receiver<T> {
impl<T: RpcMessage> From<quinn::RecvStream> for mpsc::Receiver<T> {
fn from(read: quinn::RecvStream) -> Self {
spsc::Receiver::Boxed(Box::new(QuinnReceiver {
mpsc::Receiver::Boxed(Box::new(QuinnReceiver {
recv: read,
_marker: PhantomData,
}))
Expand Down Expand Up @@ -1320,9 +1319,9 @@ pub mod rpc {
}
}

impl<T: RpcMessage> From<quinn::SendStream> for spsc::Sender<T> {
impl<T: RpcMessage> From<quinn::SendStream> for mpsc::Sender<T> {
fn from(write: quinn::SendStream) -> Self {
spsc::Sender::Boxed(Arc::new(QuinnSender(tokio::sync::Mutex::new(
mpsc::Sender::Boxed(Arc::new(QuinnSender(tokio::sync::Mutex::new(
QuinnSenderState::Open(QuinnSenderInner {
send: write,
buffer: SmallVec::new(),
Expand Down
Loading