Skip to content

Commit

Permalink
chore: send result on disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef committed Jul 15, 2021
1 parent 408b52f commit 653350b
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 40 deletions.
3 changes: 2 additions & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
use super::{
bootstrap_cache::BootstrapCache,
config::{Config, SerialisableCertificate},
endpoint::{DisconnectionEvents, Endpoint, IncomingConnections, IncomingMessages},
connections::DisconnectionEvents,
endpoint::{Endpoint, IncomingConnections, IncomingMessages},
error::{Error, Result},
peer_config::{self, DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC},
};
Expand Down
79 changes: 59 additions & 20 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use super::{
use bytes::Bytes;
use futures::{future, stream::StreamExt};
use std::net::SocketAddr;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{timeout, Duration};
use tracing::{debug, error, trace, warn};

Expand All @@ -28,6 +28,19 @@ pub(crate) struct Connection {
remover: ConnectionRemover,
}

pub type DisconnectSender = Sender<(SocketAddr, Result<()>)>;
/// Disconnection events, and the result that led to disconnection.
pub struct DisconnectionEvents(pub Receiver<(SocketAddr, Result<()>)>);

/// Disconnection
impl DisconnectionEvents {
/// Blocks until there is a disconnection event and returns the address of the disconnected peer
pub async fn next(&mut self) -> Option<(SocketAddr, Result<()>)> {
self.0.recv().await
}
}

impl Connection {
pub(crate) fn new(quic_conn: quinn::Connection, remover: ConnectionRemover) -> Self {
Self { quic_conn, remover }
Expand Down Expand Up @@ -141,7 +154,7 @@ pub(super) fn listen_for_incoming_connections(
connection_pool: ConnectionPool,
message_tx: Sender<(SocketAddr, Bytes)>,
connection_tx: Sender<SocketAddr>,
disconnection_tx: Sender<SocketAddr>,
disconnection_tx: DisconnectSender,
endpoint: Endpoint,
) {
let _ = tokio::spawn(async move {
Expand Down Expand Up @@ -188,20 +201,26 @@ pub(super) fn listen_for_incoming_messages(
mut bi_streams: quinn::IncomingBiStreams,
remover: ConnectionRemover,
message_tx: Sender<(SocketAddr, Bytes)>,
disconnection_tx: Sender<SocketAddr>,
disconnection_tx: DisconnectSender,
endpoint: Endpoint,
) {
let src = *remover.remote_addr();
let _ = tokio::spawn(async move {
debug!("qp2p another incoming listerner spawned");
let _ = future::join(
match future::try_join(
read_on_uni_streams(&mut uni_streams, src, message_tx.clone()),
read_on_bi_streams(&mut bi_streams, src, message_tx, &endpoint),
)
.await;
.await
{
Ok(_) => {
let _ = disconnection_tx.send((src, Ok(()))).await;
}
Err(error) => {
let _ = disconnection_tx.send((src, Err(error))).await;
}
}

trace!("The connection to {:?} has been terminated.", src);
let _ = disconnection_tx.send(src).await;
remover.remove().await;
});
}
Expand All @@ -211,38 +230,43 @@ async fn read_on_uni_streams(
uni_streams: &mut quinn::IncomingUniStreams,
peer_addr: SocketAddr,
message_tx: Sender<(SocketAddr, Bytes)>,
) {
) -> Result<()> {
while let Some(result) = uni_streams.next().await {
match result {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
Err(quinn::ConnectionError::ApplicationClosed(frame)) => {
trace!("Connection terminated by peer {:?}.", peer_addr);
break;
return Err(Error::from(quinn::ConnectionError::ApplicationClosed(
frame,
)));
}
Err(err) => {
warn!(
"Failed to read incoming message on uni-stream for peer {:?} with error: {:?}",
peer_addr, err
);
break;
return Err(Error::from(err));
}
Ok(mut recv) => loop {
match read_bytes(&mut recv).await {
Ok(WireMsg::UserMsg(bytes)) => {
let _ = message_tx.send((peer_addr, bytes)).await;
}
Ok(msg) => error!("Unexpected message type: {:?}", msg),
Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly)) => break,
Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly)) => {
return Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly))
}
Err(err) => {
error!(
"Failed reading from a uni-stream for peer {:?} with error: {:?}",
peer_addr, err
);
break;
return Err(err);
}
}
},
}
}
Ok(())
}

// Read messages sent by peer in a bidirectional stream.
Expand All @@ -251,20 +275,27 @@ async fn read_on_bi_streams(
peer_addr: SocketAddr,
message_tx: Sender<(SocketAddr, Bytes)>,
endpoint: &Endpoint,
) {
) -> Result<()> {
while let Some(result) = bi_streams.next().await {
match result {
Err(quinn::ConnectionError::ApplicationClosed { .. })
| Err(quinn::ConnectionError::ConnectionClosed { .. }) => {
Err(quinn::ConnectionError::ConnectionClosed(frame)) => {
trace!("Connection closed by peer {:?}.", peer_addr);
return Err(Error::from(quinn::ConnectionError::ConnectionClosed(
frame,
)));
}
Err(quinn::ConnectionError::ApplicationClosed(frame)) => {
trace!("Connection terminated by peer {:?}.", peer_addr);
break;
return Err(Error::from(quinn::ConnectionError::ApplicationClosed(
frame,
)));
}
Err(err) => {
warn!(
"Failed to read incoming message on bi-stream for peer {:?} with error: {:?}",
peer_addr, err
);
break;
return Err(Error::from(err));
}
Ok((mut send, mut recv)) => loop {
match read_bytes(&mut recv).await {
Expand All @@ -277,6 +308,8 @@ async fn read_on_bi_streams(
"Failed to handle Echo Request for peer {:?} with error: {:?}",
peer_addr, error
);

return Err(error);
}
}
Ok(WireMsg::EndpointVerificationReq(address_sent)) => {
Expand All @@ -289,6 +322,8 @@ async fn read_on_bi_streams(
.await
{
error!("Failed to handle Endpoint verification request for peer {:?} with error: {:?}", peer_addr, error);

return Err(error);
}
}
Ok(msg) => {
Expand All @@ -297,18 +332,22 @@ async fn read_on_bi_streams(
peer_addr, msg
);
}
Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly)) => break,
Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly)) => {
return Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly))
}
Err(err) => {
error!(
"Failed reading from a bi-stream for peer {:?} with error: {:?}",
peer_addr, err
);
break;
return Err(err);
}
}
},
}
}

Ok(())
}

async fn handle_endpoint_echo_req(
Expand Down
17 changes: 3 additions & 14 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use super::{
connection_deduplicator::ConnectionDeduplicator,
connection_pool::ConnectionPool,
connections::{
listen_for_incoming_connections, listen_for_incoming_messages, Connection, RecvStream,
SendStream,
listen_for_incoming_connections, listen_for_incoming_messages, Connection,
DisconnectSender, DisconnectionEvents, RecvStream, SendStream,
},
error::Result,
Config,
Expand All @@ -25,7 +25,6 @@ use bytes::Bytes;
use std::{net::SocketAddr, time::Duration};
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};

use tokio::time::timeout;
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -64,16 +63,6 @@ impl IncomingConnections {
}
}

/// Disconnection
pub struct DisconnectionEvents(pub(crate) MpscReceiver<SocketAddr>);

impl DisconnectionEvents {
/// Blocks until there is a disconnection event and returns the address of the disconnected peer
pub async fn next(&mut self) -> Option<SocketAddr> {
self.0.recv().await
}
}

/// Endpoint instance which can be used to create connections to peers,
/// and listen to incoming messages from other peers.
#[derive(Clone)]
Expand All @@ -82,7 +71,7 @@ pub struct Endpoint {
public_addr: Option<SocketAddr>,
quic_endpoint: quinn::Endpoint,
message_tx: MpscSender<(SocketAddr, Bytes)>,
disconnection_tx: MpscSender<SocketAddr>,
disconnection_tx: DisconnectSender,
client_cfg: quinn::ClientConfig,
bootstrap_nodes: Vec<SocketAddr>,
qp2p_config: Config,
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error {
/// Error joining a thread
#[error("Problem spawning thread")]
ThreadError,
/// Error occurred when attempting to connect to any
/// of the peers provided as a list of contacts.
#[error("Network bootstrap failed")]
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ mod wire_msg;

pub use api::QuicP2p;
pub use config::Config;
pub use connections::{RecvStream, SendStream};
pub use endpoint::{DisconnectionEvents, Endpoint, IncomingConnections, IncomingMessages};
pub use connections::{DisconnectionEvents, RecvStream, SendStream};
pub use endpoint::{Endpoint, IncomingConnections, IncomingMessages};
pub use error::{Error, Result};
pub use quinn::ConnectionError;

Expand Down
6 changes: 3 additions & 3 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ async fn disconnection() -> Result<()> {
// After Alice disconnects from Bob both peers should receive the disconnected event.
alice.disconnect_from(&bob_addr).await?;

if let Some(disconnected_peer) = alice_disconnections.next().await {
if let Some((disconnected_peer, _res)) = alice_disconnections.next().await {
assert_eq!(disconnected_peer, bob_addr);
} else {
anyhow!("Missing disconnection event");
}

if let Some(disconnected_peer) = bob_disconnections.next().await {
if let Some((disconnected_peer, _res)) = bob_disconnections.next().await {
assert_eq!(disconnected_peer, alice_addr);
} else {
anyhow!("Missing disconnection event");
Expand Down Expand Up @@ -286,7 +286,7 @@ async fn simultaneous_incoming_and_outgoing_connections() -> Result<()> {
bob.disconnect_from(&alice_addr).await?;

// It should be closed on Alice's side too.
if let Some(disconnected_peer) = alice_disconnections.next().await {
if let Some((disconnected_peer, _)) = alice_disconnections.next().await {
assert_eq!(disconnected_peer, bob_addr);
} else {
anyhow!("Missing disconnection event");
Expand Down

0 comments on commit 653350b

Please sign in to comment.