Skip to content

Commit

Permalink
refactor!: remove IncomingMessages
Browse files Browse the repository at this point in the history
This is part of the move to a fully connection-oriented API. The
per-connection `ConnectionIncoming` streams must be used instead.

Since `ConnectionIncoming` cannot be cloned (since the inner
`mpsc::Receiver` cannot be cloned), methods that use the connection pool
return an `Option<ConnectionIncoming>`. This also reveals one of the
inconsistencies with the connection pool, in that incoming connections
always come with a `ConnectionIncoming`, since they replace anything
already in the pool.

As with `Connection`, it's currently a connection-pool-specific wrapper
that's returned (`ConnectionIncomingHandle`). This is necessary to
ensure that lost connections are removed from the pool. These wrappers
will go away when the connection pool is removed, and the API should
remain unchanged.

BREAKING CHANGE: `IncomingMessages` has been removed, and is no longer
returned by `Endpoint::new` or `Endpoint::new_client`.
  • Loading branch information
Chris Connelly authored and lionel-faber committed Oct 20, 2021
1 parent 578936d commit c0a6a20
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 318 deletions.
24 changes: 14 additions & 10 deletions examples/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();

// create an endpoint for us to listen on and send from.
let (node, _incoming_conns, mut incoming_messages, _contact) = Endpoint::<XId>::new(
let (node, mut incoming_conns, _contact) = Endpoint::<XId>::new(
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
&[],
Config {
Expand All @@ -59,7 +59,7 @@ async fn main() -> Result<()> {
.expect("Invalid SocketAddr. Use the form 127.0.0.1:1234");
let msg = Bytes::from(MSG_MARCO);
println!("Sending to {:?} --> {:?}\n", peer, msg);
node.connect_to(&peer).await?.send(msg.clone()).await?;
node.connect_to(&peer).await?.0.send(msg.clone()).await?;
}

println!(
Expand All @@ -72,16 +72,20 @@ async fn main() -> Result<()> {
println!("Listening on: {:?}", node.public_addr());
println!("---\n");

// loop over incoming messages
while let Some((connection, bytes)) = incoming_messages.next().await {
// loop over incoming connections
while let Some((connection, mut incoming_messages)) = incoming_conns.next().await {
let src = connection.remote_address();
println!("Received from {:?} --> {:?}", src, bytes);
if bytes == *MSG_MARCO {
let reply = Bytes::from(MSG_POLO);
connection.send(reply.clone()).await?;
println!("Replied to {:?} --> {:?}", src, reply);

// loop over incoming messages
while let Some(bytes) = incoming_messages.next().await? {
println!("Received from {:?} --> {:?}", src, bytes);
if bytes == *MSG_MARCO {
let reply = Bytes::from(MSG_POLO);
connection.send(reply.clone()).await?;
println!("Replied to {:?} --> {:?}", src, reply);
}
println!();
}
println!();
}

Ok(())
Expand Down
67 changes: 29 additions & 38 deletions src/connection_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use super::{
connection_pool::{ConnId, ConnectionPool, ConnectionRemover},
error::{ConnectionError, SendError},
error::{ConnectionError, RecvError, SendError},
};
use crate::{
connection::{Connection, ConnectionIncoming, RecvStream, SendStream},
Expand Down Expand Up @@ -106,11 +106,33 @@ impl<I: ConnId> ConnectionHandle<I> {
}
}

/// The receiving API for a connection.
pub struct ConnectionIncomingHandle<I: ConnId> {
inner: ConnectionIncoming,
remover: ConnectionRemover<I>,
}

impl<I: ConnId> ConnectionIncomingHandle<I> {
pub(crate) fn new(inner: ConnectionIncoming, remover: ConnectionRemover<I>) -> Self {
Self { inner, remover }
}

/// Get the next message sent by the peer, over any stream.
pub async fn next(&mut self) -> Result<Option<Bytes>, RecvError> {
let result = self.inner.next().await;

if let Err(RecvError::ConnectionLost(_)) = &result {
self.remover.remove().await;
}

result
}
}

pub(super) fn listen_for_incoming_connections<I: ConnId>(
mut quinn_incoming: quinn::Incoming,
connection_pool: ConnectionPool<I>,
message_tx: Sender<(ConnectionHandle<I>, Bytes)>,
connection_tx: Sender<ConnectionHandle<I>>,
connection_tx: Sender<(ConnectionHandle<I>, ConnectionIncomingHandle<I>)>,
endpoint: Endpoint<I>,
quic_endpoint: quinn::Endpoint,
retry_config: Arc<RetryConfig>,
Expand All @@ -131,13 +153,10 @@ pub(super) fn listen_for_incoming_connections<I: ConnId>(
let pool_handle = connection_pool
.insert(id, peer_address, connection.clone())
.await;
let connection = endpoint.wrap_connection(connection, pool_handle);
let _ = connection_tx.send(connection.clone()).await;
listen_for_incoming_messages(
connection,
connection_incoming,
message_tx.clone(),
);
let connection = endpoint.wrap_connection(connection, pool_handle.clone());
let connection_incoming =
ConnectionIncomingHandle::new(connection_incoming, pool_handle);
let _ = connection_tx.send((connection, connection_incoming)).await;
}
Err(err) => {
warn!("An incoming connection failed because of: {:?}", err);
Expand All @@ -151,31 +170,3 @@ pub(super) fn listen_for_incoming_connections<I: ConnId>(
}
});
}

pub(super) fn listen_for_incoming_messages<I: ConnId>(
connection: ConnectionHandle<I>,
mut connection_incoming: ConnectionIncoming,
message_tx: Sender<(ConnectionHandle<I>, Bytes)>,
) {
let _ = tokio::spawn(async move {
let src = connection.remote_address();
loop {
match connection_incoming.next().await {
Ok(Some(msg)) => {
let _ = message_tx.send((connection.clone(), msg)).await;
}
Ok(None) => {
break;
}
Err(error) => {
trace!("Issue on stream reading from {}: {:?}", src, error);
break;
}
}
}

connection.remover.remove().await;

trace!("The connection to {} has terminated", src);
});
}
Loading

0 comments on commit c0a6a20

Please sign in to comment.