Skip to content

Commit

Permalink
refactor!: remove Endpoint::try_send_*
Browse files Browse the repository at this point in the history
The `try_send_*` methods exist to send messages using a connection that
already exists in the pool. This is useful for sending messages to
clients, since it would be impossible to connect to them in case a
connection is not in the pool, so using `connect_to` would retry until
failure.

This use-case is now covered by the `Endpoint::get_connection_by_addr`
method.

BREAKING CHANGE: `Endpoint::try_send_message` and
`Endpoint::try_send_message_with` have been removed. Use
`Endpoint::get_connection_by_addr` and `Connection::send_*` instead.
  • Loading branch information
Chris Connelly authored and connec committed Sep 27, 2021
1 parent 44ceb57 commit 3e1bff4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 87 deletions.
74 changes: 2 additions & 72 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
use super::igd::{forward_port, IgdError};
use super::wire_msg::WireMsg;
use super::{
config::{Config, InternalConfig, RetryConfig},
config::{Config, InternalConfig},
connection_deduplicator::{ConnectionDeduplicator, DedupHandle},
connection_pool::{ConnId, ConnectionPool, ConnectionRemover},
connections::{
listen_for_incoming_connections, listen_for_incoming_messages, Connection,
DisconnectionEvents, RecvStream, SendStream,
},
error::{
ClientEndpointError, ConnectionError, EndpointError, RecvError, RpcError, SendError,
ClientEndpointError, ConnectionError, EndpointError, RecvError, RpcError,
SerializationError,
},
};
Expand Down Expand Up @@ -401,76 +401,6 @@ impl<I: ConnId> Endpoint<I> {
connection.open_bi(priority).await
}

/// Send a message to a peer over an existing connection.
///
/// # Priority
///
/// Locally buffered data from streams with higher priority will be transmitted before data from
/// streams with lower priority. Changing the priority of a stream with pending data may only
/// take effect after that data has been transmitted. Using many different priority levels per
/// connection may have a negative impact on performance.
///
/// `0` is a sensible default for 'normal' priority.
///
/// # Connection pooling
///
/// Note that unlike most methods on `Endpoint`, this **will not** use the connection pool. This
/// method is intended to be used when it's necessary to send a message on an existing
/// connection only.
///
/// # Errors
///
/// If a connection with `dest` exists in the pool but the message fails to send,
/// `Err(Some(_))` will be returned. If there's no connection with `dest` in the pool, then
/// `Err(None)` will be returned.
pub async fn try_send_message(
&self,
msg: Bytes,
dest: &SocketAddr,
priority: i32,
) -> Result<(), Option<SendError>> {
self.try_send_message_with(msg, dest, priority, None).await
}

/// Send a message to a peer over an existing connection using given retry configuration.
///
/// The given `retries`, if any, will override the [`Config::retry_config`] used to create the
/// endpoint.
///
/// # Priority
///
/// Locally buffered data from streams with higher priority will be transmitted before data from
/// streams with lower priority. Changing the priority of a stream with pending data may only
/// take effect after that data has been transmitted. Using many different priority levels per
/// connection may have a negative impact on performance.
///
/// `0` is a sensible default for 'normal' priority.
///
/// # Errors
///
/// If a connection with `dest` exists in the pool but the message fails to send,
/// `Err(Some(_))` will be returned. If there's no connection with `dest` in the pool, then
/// `Err(None)` will be returned.
pub async fn try_send_message_with(
&self,
msg: Bytes,
dest: &SocketAddr,
priority: i32,
retries: Option<&RetryConfig>,
) -> Result<(), Option<SendError>> {
if let Some((conn, guard)) = self.connection_pool.get_by_addr(dest).await {
trace!("Connection exists in the connection pool: {}", dest);
let connection = self.wrap_connection(conn, guard);
retries
.unwrap_or(&self.config.retry_config)
.retry(|| async { Ok(connection.send_uni(msg.clone(), priority).await?) })
.await?;
Ok(())
} else {
Err(None)
}
}

/// Close all the connections of this endpoint immediately and stop accepting new connections.
pub fn close(&self) {
let _ = self.termination_tx.send(());
Expand Down
30 changes: 15 additions & 15 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {
new_endpoint().await?;
let alice_addr = alice.public_addr();

let (bob, _, mut bob_incoming_messages, _, _) = new_endpoint().await?;
let (bob, mut bob_incoming_connections, mut bob_incoming_messages, _, _) =
new_endpoint().await?;
let bob_addr = bob.public_addr();

// Try to establish two connections to the same peer at the same time.
Expand All @@ -324,17 +325,19 @@ async fn multiple_concurrent_connects_to_the_same_peer() -> Result<()> {
// Send two messages, one from each end
let msg0 = random_msg(1024);
alice
.try_send_message(msg0.clone(), &bob_addr, 0)
.await
.map_err(|error| {
error
.map(|error| eyre!(error))
.unwrap_or_else(|| eyre!("lost connection from alice to bob"))
})?;
.connect_to(&bob_addr)
.await?
.send(msg0.clone())
.await?;

let msg1 = random_msg(1024);
b_to_a.send(msg1.clone()).await?;

// Bob did not get a new incoming connection
if let Ok(Some(_)) = timeout(Duration::from_secs(2), bob_incoming_connections.next()).await {
eyre!("Unexpected incoming connection from alice to bob");
}

// Both messages are received at the other end
if let Some((src, message)) = alice_incoming_messages.next().await {
assert_eq!(src, bob_addr);
Expand Down Expand Up @@ -706,13 +709,10 @@ async fn client() -> Result<()> {
assert_eq!(&message[..], b"hello");

server
.try_send_message(b"world"[..].into(), &client.public_addr(), 0)
.await
.map_err(|error| {
error
.map(|error| eyre!(error))
.unwrap_or_else(|| eyre!("no longer connected to client"))
})?;
.connect_to(&client.public_addr())
.await?
.send(b"world"[..].into())
.await?;
let (sender, message) = client_messages
.next()
.await
Expand Down

0 comments on commit 3e1bff4

Please sign in to comment.