Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce InflightProtocolDataQueue #4834

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"misc/memory-connection-limits",
"misc/metrics",
"misc/multistream-select",
"misc/protocol-utils",
"misc/quick-protobuf-codec",
"misc/quickcheck-ext",
"misc/rw-stream-sink",
Expand Down Expand Up @@ -60,8 +61,8 @@ members = [
"transports/webrtc",
"transports/webrtc-websys",
"transports/websocket",
"transports/webtransport-websys",
"transports/websocket-websys",
"transports/webtransport-websys",
"wasm-tests/webtransport-tests",
]
resolver = "2"
Expand Down Expand Up @@ -94,6 +95,7 @@ libp2p-perf = { version = "0.3.0", path = "protocols/perf" }
libp2p-ping = { version = "0.44.0", path = "protocols/ping" }
libp2p-plaintext = { version = "0.41.0", path = "transports/plaintext" }
libp2p-pnet = { version = "0.24.0", path = "transports/pnet" }
libp2p-protocol-utils = { version = "0.1.0", path = "misc/protocol-utils" }
libp2p-quic = { version = "0.10.1", path = "transports/quic" }
libp2p-relay = { version = "0.17.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" }
Expand Down
4 changes: 4 additions & 0 deletions misc/protocol-utils/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## 0.1.0 - unreleased

- Initial release, offering `InflightProtocolDataQueue`.
See [PR 4834](https://github.com/libp2p/rust-libp2p/pull/4834).
14 changes: 14 additions & 0 deletions misc/protocol-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "libp2p-protocol-utils"
version = "0.1.0"
edition = "2021"
description = "Utilities for implementing protocols for libp2p, via `NetworkBehaviour` and `ConnectionHandler`."
rust-version.workspace = true
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = []
categories = ["data-structures"]
publish = false # Temporary until we actually publish it.

[lints]
workspace = true
61 changes: 61 additions & 0 deletions misc/protocol-utils/src/ipd_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::collections::VecDeque;

/// Manages associated data of request-response protocols whilst they are in-flight.
///
/// The [`InflightProtocolDataQueue`] ensures that for each in-flight protocol, there is a corresponding piece of associated data.
/// We process the associated data in a FIFO order based on the incoming responses.
/// In other words, we assume that requests and their responses are either temporally ordered or it doesn't matter, which piece of data is paired with a particular response.
pub struct InflightProtocolDataQueue<D, Req, Res> {
data_of_inflight_requests: VecDeque<D>,
pending_requests: VecDeque<Req>,
received_responses: VecDeque<Res>,
}

impl<D, Req, Res> Default for InflightProtocolDataQueue<D, Req, Res> {
fn default() -> Self {
Self {
pending_requests: Default::default(),
received_responses: Default::default(),
data_of_inflight_requests: Default::default(),
}
}
}

impl<D, Req, Res> InflightProtocolDataQueue<D, Req, Res> {
/// Enqueues a new request along-side with the associated data.
///
/// The request will be returned again from [`InflightProtocolDataQueue::next_request`].
pub fn enqueue_request(&mut self, request: Req, data: D) {
self.pending_requests.push_back(request);
self.data_of_inflight_requests.push_back(data);
}

/// Submits a response to the queue.
///
/// A pair of response and data will be returned from [`InflightProtocolDataQueue::next_completed`].
pub fn submit_response(&mut self, res: Res) {
debug_assert!(
self.data_of_inflight_requests.len() > self.received_responses.len(),
"Expect to not provide more responses than requests were started"
);
self.received_responses.push_back(res);
}

/// How many protocols are currently in-flight.
pub fn num_inflight(&self) -> usize {
self.data_of_inflight_requests.len() - self.received_responses.len()
}

pub fn next_completed(&mut self) -> Option<(Res, D)> {
let res = self.received_responses.pop_front()?;
let data = self.data_of_inflight_requests.pop_front()?;

Some((res, data))
}

pub fn next_request(&mut self) -> Option<Req> {
let req = self.pending_requests.pop_front()?;

Some(req)
}
}
3 changes: 3 additions & 0 deletions misc/protocol-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod ipd_queue;

pub use ipd_queue::InflightProtocolDataQueue;
1 change: 1 addition & 0 deletions protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ asynchronous-codec = { workspace = true }
futures = "0.3.29"
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
libp2p-protocol-utils = { workspace = true }
quick-protobuf = "0.8"
quick-protobuf-codec = { workspace = true }
libp2p-identity = { workspace = true, features = ["rand"] }
Expand Down
Loading