Skip to content

Commit

Permalink
refactor!: remove connection pooling
Browse files Browse the repository at this point in the history
This is a significant breaking change to the library's behaviour. In
particular:

- Methods on `Endpoint` to remove or fetch connections from the
  connection pool have been removed.
- All connection-returning methods on `Endpoint` will always return a
  new connection (on success).
- Connections are not held open automatically. If the
  `IncomingConnections` struct is dropped when creating an `Endpoint`,
  any connections handled by the endpoint will be dropped immediately
  (in future this should probably drop the underlying `quinn::Incoming`
  as well, meaning connections will be rejected outright).

The upshot of this is change is that it's now necessary for callers to
apply their own resource management for connection handles. A naive
migration would likely lead to many more connections being made, which
could be detrimental to performance. However, this simplifies `qp2p` and
removes a lot of edge-cases, and is expected to be removing a lot of
undiscovered bugs. Callers can now use a connection management strategy
that makes the most sense for them, which is likely to be much simpler
than the general-purpose approach that had to be taken here.

BREAKING CHANGE:

- The `ConnId` trait has been removed.
- `Endpoint`, `IncomingConnections`, `Connection`, and
  `ConnectionIncoming` no longer have a generic type parameter.
- `Endpoint::disconnect_from`, `Endpoint::get_connection_by_addr`, and
  `Endpoint::get_connection_by_id` have been removed.
- `Connection::id` has been removed.
- `Endpoint::new`, `Endpoint::connect_to`, and
  `Endpoint::connect_to_any` now return
  `(Connection, ConnectionIncoming)`, rather than `(Connection,
  Option<ConnectionIncoming>)`.
- `Connection::open_bi` no longer takes a `priority` argument. This can
  be set with `SendStream::set_priority` instead.
- Semantically, all calls to `Endpoint::connect_to` and
  `Endpoint::connect_to_any` will establish and return new connections.
  There is no connection reuse.
  • Loading branch information
Chris Connelly authored and joshuef committed Oct 27, 2021
1 parent 0385601 commit fd19094
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 771 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ tiny-keccak = "2.0.2"
tokio = { version = "1.2.0", features = ["sync"] }
tracing = "~0.1.26"
webpki = "~0.21.3"
xor_name = "3.1.0"

[dev-dependencies]
color-eyre = "0.5.11"
Expand Down
10 changes: 2 additions & 8 deletions examples/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use bytes::Bytes;
use color_eyre::eyre::Result;
use qp2p::{Config, ConnId, Endpoint};
use qp2p::{Config, Endpoint};
use std::{
env,
net::{Ipv4Addr, SocketAddr},
Expand All @@ -24,12 +24,6 @@ use std::{
#[derive(Default, Ord, PartialEq, PartialOrd, Eq, Clone, Copy)]
struct XId(pub [u8; 32]);

impl ConnId for XId {
fn generate(_socket_addr: &SocketAddr) -> Self {
XId(rand::random())
}
}

#[tokio::main]
async fn main() -> Result<()> {
color_eyre::install()?;
Expand All @@ -41,7 +35,7 @@ async fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();

// create an endpoint for us to listen on and send from.
let (node, mut incoming_conns, _contact) = Endpoint::<XId>::new(
let (node, mut incoming_conns, _contact) = Endpoint::new(
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
&[],
Config {
Expand Down
18 changes: 9 additions & 9 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const ENDPOINT_VERIFICATION_TIMEOUT: Duration = Duration::from_secs(30);

/// The sending API for a connection.
#[derive(Clone)]
pub(crate) struct Connection {
pub struct Connection {
inner: quinn::Connection,
default_retry_config: Option<Arc<RetryConfig>>,

Expand Down Expand Up @@ -67,7 +67,7 @@ impl Connection {
}

/// The address of the remote peer.
pub(crate) fn remote_address(&self) -> SocketAddr {
pub fn remote_address(&self) -> SocketAddr {
self.inner.remote_address()
}

Expand All @@ -80,14 +80,14 @@ impl Connection {
/// [`Config`](crate::Config) that was used to construct the [`Endpoint`] this connection
/// belongs to. See [`send_with`](Self::send_with) if you want to send a message with specific
/// configuration.
pub(crate) async fn send(&self, msg: Bytes) -> Result<(), SendError> {
pub async fn send(&self, msg: Bytes) -> Result<(), SendError> {
self.send_with(msg, 0, None).await
}

/// Send a message to the peer using the given configuration.
///
/// See [`send`](Self::send) if you want to send with the default configuration.
pub(crate) async fn send_with(
pub async fn send_with(
&self,
msg: Bytes,
priority: i32,
Expand Down Expand Up @@ -117,7 +117,7 @@ impl Connection {
/// Open a unidirection stream to the peer.
///
/// Messages sent over the stream will arrive at the peer in the order they were sent.
pub(crate) async fn open_uni(&self) -> Result<SendStream, ConnectionError> {
pub async fn open_uni(&self) -> Result<SendStream, ConnectionError> {
let send_stream = self.inner.open_uni().await?;
Ok(SendStream::new(send_stream))
}
Expand All @@ -128,7 +128,7 @@ impl Connection {
/// automatically correlate response messages, for example.
///
/// Messages sent over the stream will arrive at the peer in the order they were sent.
pub(crate) async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
let (send_stream, recv_stream) = self.inner.open_bi().await?;
Ok((SendStream::new(send_stream), RecvStream::new(recv_stream)))
}
Expand All @@ -138,7 +138,7 @@ impl Connection {
/// This is not a graceful close - pending operations will fail immediately with
/// [`ConnectionError::Closed`]`(`[`Close::Local`]`)`, and data on unfinished streams is not
/// guaranteed to be delivered.
pub(crate) fn close(&self) {
pub fn close(&self) {
self.inner.close(0u8.into(), b"");
}

Expand Down Expand Up @@ -240,7 +240,7 @@ impl fmt::Debug for RecvStream {
}

/// The receiving API for a connection.
pub(crate) struct ConnectionIncoming {
pub struct ConnectionIncoming {
message_rx: mpsc::Receiver<Result<Bytes, RecvError>>,
_alive_tx: Arc<watch::Sender<()>>,
}
Expand Down Expand Up @@ -274,7 +274,7 @@ impl ConnectionIncoming {
}

/// Get the next message sent by the peer, over any stream.
pub(crate) async fn next(&mut self) -> Result<Option<Bytes>, RecvError> {
pub async fn next(&mut self) -> Result<Option<Bytes>, RecvError> {
self.message_rx.recv().await.transpose()
}
}
Expand Down
172 changes: 0 additions & 172 deletions src/connection_deduplicator.rs

This file was deleted.

Loading

0 comments on commit fd19094

Please sign in to comment.