Skip to content

Commit

Permalink
refactor(relay): move stream-handling away from {In,Out}boundUpgrade
Browse files Browse the repository at this point in the history
Fixes: #4075.

Pull-Request: #4275.
  • Loading branch information
dgarus authored Sep 20, 2023
1 parent 5d740a8 commit efea490
Show file tree
Hide file tree
Showing 18 changed files with 1,284 additions and 832 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ members = [
"interop-tests",
"misc/allow-block-list",
"misc/connection-limits",
"misc/futures-bounded",
"misc/keygen",
"misc/memory-connection-limits",
"misc/metrics",
Expand Down Expand Up @@ -69,6 +70,7 @@ resolver = "2"
rust-version = "1.65.0"

[workspace.dependencies]
futures-bounded = { version = "0.1.0", path = "misc/futures-bounded" }
libp2p = { version = "0.52.3", path = "libp2p" }
libp2p-allow-block-list = { version = "0.2.0", path = "misc/allow-block-list" }
libp2p-autonat = { version = "0.11.0", path = "protocols/autonat" }
Expand Down
3 changes: 3 additions & 0 deletions misc/futures-bounded/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 0.1.0 - unreleased

Initial release.
20 changes: 20 additions & 0 deletions misc/futures-bounded/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "futures-bounded"
version = "0.1.0"
edition = "2021"
rust-version.workspace = true
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["futures", "async", "backpressure"]
categories = ["data-structures", "asynchronous"]
description = "Utilities for bounding futures in size and time."
publish = false # TEMP FIX until https://github.com/obi1kenobi/cargo-semver-checks-action/issues/53 is fixed.

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures-util = { version = "0.3.28" }
futures-timer = "3.0.2"

[dev-dependencies]
tokio = { version = "1.29.1", features = ["macros", "rt"] }
28 changes: 28 additions & 0 deletions misc/futures-bounded/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
mod map;
mod set;

pub use map::{FuturesMap, PushError};
pub use set::FuturesSet;
use std::fmt;
use std::fmt::Formatter;
use std::time::Duration;

/// A future failed to complete within the given timeout.
#[derive(Debug)]
pub struct Timeout {
limit: Duration,
}

impl Timeout {
fn new(duration: Duration) -> Self {
Self { limit: duration }
}
}

impl fmt::Display for Timeout {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "future failed to complete within {:?}", self.limit)
}
}

impl std::error::Error for Timeout {}
268 changes: 268 additions & 0 deletions misc/futures-bounded/src/map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
use std::future::Future;
use std::hash::Hash;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::time::Duration;

use futures_timer::Delay;
use futures_util::future::BoxFuture;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, StreamExt};

use crate::Timeout;

/// Represents a map of [`Future`]s.
///
/// Each future must finish within the specified time and the map never outgrows its capacity.
pub struct FuturesMap<ID, O> {
timeout: Duration,
capacity: usize,
inner: FuturesUnordered<TaggedFuture<ID, TimeoutFuture<BoxFuture<'static, O>>>>,
empty_waker: Option<Waker>,
full_waker: Option<Waker>,
}

/// Error of a future pushing
#[derive(PartialEq, Debug)]
pub enum PushError<F> {
/// The length of the set is equal to the capacity
BeyondCapacity(F),
/// The set already contains the given future's ID
ReplacedFuture(F),
}

impl<ID, O> FuturesMap<ID, O> {
pub fn new(timeout: Duration, capacity: usize) -> Self {
Self {
timeout,
capacity,
inner: Default::default(),
empty_waker: None,
full_waker: None,
}
}
}

impl<ID, O> FuturesMap<ID, O>
where
ID: Clone + Hash + Eq + Send + Unpin + 'static,
{
/// Push a future into the map.
///
/// This method inserts the given future with defined `future_id` to the set.
/// If the length of the map is equal to the capacity, this method returns [PushError::BeyondCapacity],
/// that contains the passed future. In that case, the future is not inserted to the map.
/// If a future with the given `future_id` already exists, then the old future will be replaced by a new one.
/// In that case, the returned error [PushError::ReplacedFuture] contains the old future.
pub fn try_push<F>(&mut self, future_id: ID, future: F) -> Result<(), PushError<BoxFuture<O>>>
where
F: Future<Output = O> + Send + 'static,
{
if self.inner.len() >= self.capacity {
return Err(PushError::BeyondCapacity(future.boxed()));
}

if let Some(waker) = self.empty_waker.take() {
waker.wake();
}

match self.inner.iter_mut().find(|tagged| tagged.tag == future_id) {
None => {
self.inner.push(TaggedFuture {
tag: future_id,
inner: TimeoutFuture {
inner: future.boxed(),
timeout: Delay::new(self.timeout),
},
});

Ok(())
}
Some(existing) => {
let old_future = mem::replace(
&mut existing.inner,
TimeoutFuture {
inner: future.boxed(),
timeout: Delay::new(self.timeout),
},
);

Err(PushError::ReplacedFuture(old_future.inner))
}
}
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

#[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] // &mut Context is idiomatic.
pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> {
if self.inner.len() < self.capacity {
return Poll::Ready(());
}

self.full_waker = Some(cx.waker().clone());

Poll::Pending
}

pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(ID, Result<O, Timeout>)> {
let maybe_result = futures_util::ready!(self.inner.poll_next_unpin(cx));

match maybe_result {
None => {
self.empty_waker = Some(cx.waker().clone());
Poll::Pending
}
Some((id, Ok(output))) => Poll::Ready((id, Ok(output))),
Some((id, Err(_timeout))) => Poll::Ready((id, Err(Timeout::new(self.timeout)))),
}
}
}

struct TimeoutFuture<F> {
inner: F,
timeout: Delay,
}

impl<F> Future for TimeoutFuture<F>
where
F: Future + Unpin,
{
type Output = Result<F::Output, ()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.timeout.poll_unpin(cx).is_ready() {
return Poll::Ready(Err(()));
}

self.inner.poll_unpin(cx).map(Ok)
}
}

struct TaggedFuture<T, F> {
tag: T,
inner: F,
}

impl<T, F> Future for TaggedFuture<T, F>
where
T: Clone + Unpin,
F: Future + Unpin,
{
type Output = (T, F::Output);

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let output = futures_util::ready!(self.inner.poll_unpin(cx));

Poll::Ready((self.tag.clone(), output))
}
}

#[cfg(test)]
mod tests {
use std::future::{pending, poll_fn, ready};
use std::pin::Pin;
use std::time::Instant;

use super::*;

#[test]
fn cannot_push_more_than_capacity_tasks() {
let mut futures = FuturesMap::new(Duration::from_secs(10), 1);

assert!(futures.try_push("ID_1", ready(())).is_ok());
matches!(
futures.try_push("ID_2", ready(())),
Err(PushError::BeyondCapacity(_))
);
}

#[test]
fn cannot_push_the_same_id_few_times() {
let mut futures = FuturesMap::new(Duration::from_secs(10), 5);

assert!(futures.try_push("ID", ready(())).is_ok());
matches!(
futures.try_push("ID", ready(())),
Err(PushError::ReplacedFuture(_))
);
}

#[tokio::test]
async fn futures_timeout() {
let mut futures = FuturesMap::new(Duration::from_millis(100), 1);

let _ = futures.try_push("ID", pending::<()>());
Delay::new(Duration::from_millis(150)).await;
let (_, result) = poll_fn(|cx| futures.poll_unpin(cx)).await;

assert!(result.is_err())
}

// Each future causes a delay, `Task` only has a capacity of 1, meaning they must be processed in sequence.
// We stop after NUM_FUTURES tasks, meaning the overall execution must at least take DELAY * NUM_FUTURES.
#[tokio::test]
async fn backpressure() {
const DELAY: Duration = Duration::from_millis(100);
const NUM_FUTURES: u32 = 10;

let start = Instant::now();
Task::new(DELAY, NUM_FUTURES, 1).await;
let duration = start.elapsed();

assert!(duration >= DELAY * NUM_FUTURES);
}

struct Task {
future: Duration,
num_futures: usize,
num_processed: usize,
inner: FuturesMap<u8, ()>,
}

impl Task {
fn new(future: Duration, num_futures: u32, capacity: usize) -> Self {
Self {
future,
num_futures: num_futures as usize,
num_processed: 0,
inner: FuturesMap::new(Duration::from_secs(60), capacity),
}
}
}

impl Future for Task {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while this.num_processed < this.num_futures {
if let Poll::Ready((_, result)) = this.inner.poll_unpin(cx) {
if result.is_err() {
panic!("Timeout is great than future delay")
}

this.num_processed += 1;
continue;
}

if let Poll::Ready(()) = this.inner.poll_ready_unpin(cx) {
// We push the constant future's ID to prove that user can use the same ID
// if the future was finished
let maybe_future = this.inner.try_push(1u8, Delay::new(this.future));
assert!(maybe_future.is_ok(), "we polled for readiness");

continue;
}

return Poll::Pending;
}

Poll::Ready(())
}
}
}
Loading

0 comments on commit efea490

Please sign in to comment.