Skip to content

Commit

Permalink
refactor: rename connections -> connection_handle
Browse files Browse the repository at this point in the history
This is an internal refactor only, in preparation for replacing most of
the connection machinery with an unpooled implementation.
  • Loading branch information
Chris Connelly authored and lionel-faber committed Oct 13, 2021
1 parent 6b79435 commit cc6e9fc
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 33 deletions.
20 changes: 10 additions & 10 deletions src/connections.rs → src/connection_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tracing::{trace, warn};
/// [`Endpoint`]: crate::Endpoint
/// [`Endpoint::connect_to`]: crate::Endpoint::connect_to
#[derive(Clone)]
pub struct Connection<I: ConnId> {
pub struct ConnectionHandle<I: ConnId> {
quic_conn: quinn::Connection,
default_retry_config: Arc<RetryConfig>,
remover: ConnectionRemover<I>,
Expand All @@ -49,7 +49,7 @@ impl DisconnectionEvents {
}
}

impl<I: ConnId> Connection<I> {
impl<I: ConnId> ConnectionHandle<I> {
pub(crate) fn new(
quic_conn: quinn::Connection,
default_retry_config: Arc<RetryConfig>,
Expand Down Expand Up @@ -239,8 +239,8 @@ async fn send_msg(mut send_stream: &mut quinn::SendStream, msg: Bytes) -> Result
pub(super) fn listen_for_incoming_connections<I: ConnId>(
mut quinn_incoming: quinn::Incoming,
connection_pool: ConnectionPool<I>,
message_tx: Sender<(Connection<I>, Bytes)>,
connection_tx: Sender<Connection<I>>,
message_tx: Sender<(ConnectionHandle<I>, Bytes)>,
connection_tx: Sender<ConnectionHandle<I>>,
disconnection_tx: Sender<SocketAddr>,
endpoint: Endpoint<I>,
) {
Expand Down Expand Up @@ -284,10 +284,10 @@ pub(super) fn listen_for_incoming_connections<I: ConnId>(
}

pub(super) fn listen_for_incoming_messages<I: ConnId>(
connection: Connection<I>,
connection: ConnectionHandle<I>,
mut uni_streams: quinn::IncomingUniStreams,
mut bi_streams: quinn::IncomingBiStreams,
message_tx: Sender<(Connection<I>, Bytes)>,
message_tx: Sender<(ConnectionHandle<I>, Bytes)>,
disconnection_tx: Sender<SocketAddr>,
endpoint: Endpoint<I>,
) {
Expand Down Expand Up @@ -324,9 +324,9 @@ enum StreamError {

// Read messages sent by peer in an unidirectional stream.
async fn read_on_uni_streams<I: ConnId>(
connection: &Connection<I>,
connection: &ConnectionHandle<I>,
uni_streams: &mut quinn::IncomingUniStreams,
message_tx: Sender<(Connection<I>, Bytes)>,
message_tx: Sender<(ConnectionHandle<I>, Bytes)>,
) -> Result<(), StreamError> {
let peer_addr = connection.remote_address();
while let Some(result) = uni_streams.next().await {
Expand Down Expand Up @@ -371,9 +371,9 @@ async fn read_on_uni_streams<I: ConnId>(

// Read messages sent by peer in a bidirectional stream.
async fn read_on_bi_streams<I: ConnId>(
connection: &Connection<I>,
connection: &ConnectionHandle<I>,
bi_streams: &mut quinn::IncomingBiStreams,
message_tx: Sender<(Connection<I>, Bytes)>,
message_tx: Sender<(ConnectionHandle<I>, Bytes)>,
endpoint: &Endpoint<I>,
) -> Result<(), StreamError> {
let peer_addr = connection.remote_address();
Expand Down
45 changes: 24 additions & 21 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use super::wire_msg::WireMsg;
use super::{
config::{Config, InternalConfig, RetryConfig, SERVER_NAME},
connection_deduplicator::{ConnectionDeduplicator, DedupHandle},
connection_pool::{ConnId, ConnectionPool, ConnectionRemover},
connections::{
listen_for_incoming_connections, listen_for_incoming_messages, Connection,
connection_handle::{
listen_for_incoming_connections, listen_for_incoming_messages, ConnectionHandle,
DisconnectionEvents, RecvStream, SendStream,
},
connection_pool::{ConnId, ConnectionPool, ConnectionRemover},
error::{
ClientEndpointError, ConnectionError, EndpointError, RecvError, RpcError,
SerializationError,
Expand Down Expand Up @@ -48,24 +48,24 @@ const STANDARD_CHANNEL_SIZE: usize = 10000;

/// Channel on which incoming messages can be listened to
#[derive(Debug)]
pub struct IncomingMessages<I: ConnId>(pub(crate) MpscReceiver<(Connection<I>, Bytes)>);
pub struct IncomingMessages<I: ConnId>(pub(crate) MpscReceiver<(ConnectionHandle<I>, Bytes)>);

impl<I: ConnId> IncomingMessages<I> {
/// Blocks and returns the next incoming message and the connection it arrived on.
///
/// **Note:** holding on to `Connection`
pub async fn next(&mut self) -> Option<(Connection<I>, Bytes)> {
pub async fn next(&mut self) -> Option<(ConnectionHandle<I>, Bytes)> {
self.0.recv().await
}
}

/// Channel on which incoming connections are notified on
pub struct IncomingConnections<I: ConnId>(pub(crate) MpscReceiver<Connection<I>>);
pub struct IncomingConnections<I: ConnId>(pub(crate) MpscReceiver<ConnectionHandle<I>>);

impl<I: ConnId> IncomingConnections<I> {
/// Blocks until there is an incoming connection and returns the address of the
/// connecting peer
pub async fn next(&mut self) -> Option<Connection<I>> {
pub async fn next(&mut self) -> Option<ConnectionHandle<I>> {
self.0.recv().await
}
}
Expand All @@ -78,7 +78,7 @@ pub struct Endpoint<I: ConnId> {
quic_endpoint: quinn::Endpoint,
retry_config: Arc<RetryConfig>,

message_tx: MpscSender<(Connection<I>, Bytes)>,
message_tx: MpscSender<(ConnectionHandle<I>, Bytes)>,
disconnection_tx: MpscSender<SocketAddr>,
termination_tx: Sender<()>,
connection_pool: ConnectionPool<I>,
Expand Down Expand Up @@ -133,7 +133,7 @@ impl<I: ConnId> Endpoint<I> {
IncomingConnections<I>,
IncomingMessages<I>,
DisconnectionEvents,
Option<Connection<I>>,
Option<ConnectionHandle<I>>,
),
EndpointError,
> {
Expand Down Expand Up @@ -311,7 +311,7 @@ impl<I: ConnId> Endpoint<I> {
pub async fn connect_to(
&self,
node_addr: &SocketAddr,
) -> Result<Connection<I>, ConnectionError> {
) -> Result<ConnectionHandle<I>, ConnectionError> {
self.get_or_connect_to(node_addr).await
}

Expand All @@ -330,7 +330,7 @@ impl<I: ConnId> Endpoint<I> {
/// method will check the pool before opening each connection. If a new connection is opened, it
/// will be added to the pool. Note that already pooled connections will have a higher chance of
/// 'winning' the race, and being the selected peer.
pub async fn connect_to_any(&self, peer_addrs: &[SocketAddr]) -> Option<Connection<I>> {
pub async fn connect_to_any(&self, peer_addrs: &[SocketAddr]) -> Option<ConnectionHandle<I>> {
trace!("Connecting to any of {:?}", peer_addrs);
if peer_addrs.is_empty() {
return None;
Expand Down Expand Up @@ -391,15 +391,15 @@ impl<I: ConnId> Endpoint<I> {
}

/// Get the existing `Connection` for a `SocketAddr`.
pub async fn get_connection_by_addr(&self, addr: &SocketAddr) -> Option<Connection<I>> {
pub async fn get_connection_by_addr(&self, addr: &SocketAddr) -> Option<ConnectionHandle<I>> {
self.connection_pool
.get_by_addr(addr)
.await
.map(|(connection, remover)| self.wrap_connection(connection, remover))
}

/// Get the existing `Connection` for the given ID.
pub async fn get_connection_by_id(&self, id: &I) -> Option<Connection<I>> {
pub async fn get_connection_by_id(&self, id: &I) -> Option<ConnectionHandle<I>> {
self.connection_pool
.get_by_id(id)
.await
Expand Down Expand Up @@ -442,7 +442,7 @@ impl<I: ConnId> Endpoint<I> {
pub(crate) async fn get_or_connect_to(
&self,
addr: &SocketAddr,
) -> Result<Connection<I>, ConnectionError> {
) -> Result<ConnectionHandle<I>, ConnectionError> {
let completion = loop {
if let Some((conn, remover)) = self.connection_pool.get_by_addr(addr).await {
trace!("We are already connected to this peer: {}", addr);
Expand Down Expand Up @@ -531,7 +531,7 @@ impl<I: ConnId> Endpoint<I> {
&mut self,
config_external_ip: Option<IpAddr>,
config_external_port: Option<u16>,
contact: Option<&Connection<I>>,
contact: Option<&ConnectionHandle<I>>,
) -> Result<SocketAddr, EndpointError> {
let mut public_addr = self.local_addr;

Expand Down Expand Up @@ -604,7 +604,7 @@ impl<I: ConnId> Endpoint<I> {
}

/// Perform the endpoint echo RPC with the given contact.
async fn endpoint_echo(&self, contact: &Connection<I>) -> Result<SocketAddr, RpcError> {
async fn endpoint_echo(&self, contact: &ConnectionHandle<I>) -> Result<SocketAddr, RpcError> {
let (mut send, mut recv) = contact.open_bi(0).await?;

send.send(WireMsg::EndpointEchoReq).await?;
Expand All @@ -618,7 +618,7 @@ impl<I: ConnId> Endpoint<I> {
/// Perform the endpoint verification RPC with the given peer.
async fn endpoint_verification(
&self,
contact: &Connection<I>,
contact: &ConnectionHandle<I>,
public_addr: SocketAddr,
) -> Result<bool, RpcError> {
let (mut send, mut recv) = contact.open_bi(0).await?;
Expand All @@ -637,15 +637,18 @@ impl<I: ConnId> Endpoint<I> {
&self,
connection: quinn::Connection,
remover: ConnectionRemover<I>,
) -> Connection<I> {
Connection::new(connection, self.retry_config.clone(), remover)
) -> ConnectionHandle<I> {
ConnectionHandle::new(connection, self.retry_config.clone(), remover)
}
}

// a private helper struct for passing a bunch of channel-related things
type Msg<I> = (Connection<I>, Bytes);
type Msg<I> = (ConnectionHandle<I>, Bytes);
struct Channels<I: ConnId> {
connection: (MpscSender<Connection<I>>, MpscReceiver<Connection<I>>),
connection: (
MpscSender<ConnectionHandle<I>>,
MpscReceiver<ConnectionHandle<I>>,
),
message: (MpscSender<Msg<I>>, MpscReceiver<Msg<I>>),
disconnection: (MpscSender<SocketAddr>, MpscReceiver<SocketAddr>),
termination: (Sender<()>, broadcast::Receiver<()>),
Expand Down
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@

pub mod config;
mod connection_deduplicator;
mod connection_handle;
mod connection_pool;
mod connections;
mod endpoint;
mod error;
#[cfg(feature = "igd")]
Expand All @@ -58,8 +58,10 @@ mod utils;
mod wire_msg;

pub use config::{Config, ConfigError, RetryConfig};
pub use connection_handle::{
ConnectionHandle as Connection, DisconnectionEvents, RecvStream, SendStream,
};
pub use connection_pool::ConnId;
pub use connections::{Connection, DisconnectionEvents, RecvStream, SendStream};
pub use endpoint::{Endpoint, IncomingConnections, IncomingMessages};
#[cfg(feature = "igd")]
pub use error::UpnpError;
Expand Down

0 comments on commit cc6e9fc

Please sign in to comment.