Skip to content

Commit

Permalink
feat(kad): Limit number of active outbound streams (#3287)
Browse files Browse the repository at this point in the history
Limit number of active outbound streams to not exceed configured number of streams.

Resolves #3236.
  • Loading branch information
nazar-pc authored Jan 24, 2023
1 parent 687fba8 commit 520523b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 36 deletions.
3 changes: 3 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

- Remove lifetime from `RecordStore` and use GATs instead. See [PR 3239].

- Limit number of active outbound streams to 32. See [PR 3287].

- Bump MSRV to 1.65.0.

[PR 3239]: https://github.com/libp2p/rust-libp2p/pull/3239
[PR 3287]: https://github.com/libp2p/rust-libp2p/pull/3287

# 0.42.1

Expand Down
80 changes: 44 additions & 36 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ use libp2p_swarm::{
KeepAlive, NegotiatedSubstream, SubstreamProtocol,
};
use log::trace;
use std::collections::VecDeque;
use std::task::Waker;
use std::{
error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration,
};

const MAX_NUM_INBOUND_SUBSTREAMS: usize = 32;
const MAX_NUM_SUBSTREAMS: usize = 32;

/// A prototype from which [`KademliaHandler`]s can be constructed.
pub struct KademliaHandlerProto<T> {
Expand Down Expand Up @@ -93,6 +94,14 @@ pub struct KademliaHandler<TUserData> {
/// List of active outbound substreams with the state they are in.
outbound_substreams: SelectAll<OutboundSubstreamState<TUserData>>,

/// Number of outbound streams being upgraded right now.
num_requested_outbound_streams: usize,

/// List of outbound substreams that are waiting to become active next.
/// Contains the request we want to send, and the user data if we expect an answer.
requested_streams:
VecDeque<SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>>,

/// List of active inbound substreams with the state they are in.
inbound_substreams: SelectAll<InboundSubstreamState<TUserData>>,

Expand Down Expand Up @@ -139,9 +148,6 @@ pub struct KademliaHandlerConfig {

/// State of an active outbound substream.
enum OutboundSubstreamState<TUserData> {
/// We haven't started opening the outgoing substream yet.
/// Contains the request we want to send, and the user data if we expect an answer.
PendingOpen(SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>),
/// Waiting to send a message to the remote.
PendingSend(
KadOutStreamSink<NegotiatedSubstream>,
Expand Down Expand Up @@ -524,6 +530,8 @@ where
next_connec_unique_id: UniqueConnecId(0),
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
num_requested_outbound_streams: 0,
requested_streams: Default::default(),
keep_alive,
protocol_status: ProtocolStatus::Unconfirmed,
}
Expand All @@ -543,6 +551,7 @@ where
.push(OutboundSubstreamState::PendingSend(
protocol, msg, user_data,
));
self.num_requested_outbound_streams -= 1;
if let ProtocolStatus::Unconfirmed = self.protocol_status {
// Upon the first successfully negotiated substream, we know that the
// remote is configured with the same protocol name and we want
Expand Down Expand Up @@ -572,7 +581,7 @@ where
self.protocol_status = ProtocolStatus::Confirmed;
}

if self.inbound_substreams.len() == MAX_NUM_INBOUND_SUBSTREAMS {
if self.inbound_substreams.len() == MAX_NUM_SUBSTREAMS {
if let Some(s) = self.inbound_substreams.iter_mut().find(|s| {
matches!(
s,
Expand Down Expand Up @@ -624,6 +633,7 @@ where
self.outbound_substreams
.push(OutboundSubstreamState::ReportError(error.into(), user_data));
}
self.num_requested_outbound_streams -= 1;
}
}

Expand Down Expand Up @@ -667,23 +677,21 @@ where
}
KademliaHandlerIn::FindNodeReq { key, user_data } => {
let msg = KadRequestMsg::FindNode { key };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
)));
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
}
KademliaHandlerIn::FindNodeRes {
closer_peers,
request_id,
} => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }),
KademliaHandlerIn::GetProvidersReq { key, user_data } => {
let msg = KadRequestMsg::GetProviders { key };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
)));
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
}
KademliaHandlerIn::GetProvidersRes {
closer_peers,
Expand All @@ -698,27 +706,24 @@ where
),
KademliaHandlerIn::AddProvider { key, provider } => {
let msg = KadRequestMsg::AddProvider { key, provider };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, None),
)));
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, None),
));
}
KademliaHandlerIn::GetRecord { key, user_data } => {
let msg = KadRequestMsg::GetValue { key };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
)));
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
}
KademliaHandlerIn::PutRecord { record, user_data } => {
let msg = KadRequestMsg::PutValue { record };
self.outbound_substreams
.push(OutboundSubstreamState::PendingOpen(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
)));
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
}
KademliaHandlerIn::GetRecordRes {
record,
Expand Down Expand Up @@ -775,6 +780,15 @@ where
return Poll::Ready(event);
}

let num_in_progress_outbound_substreams =
self.outbound_substreams.len() + self.num_requested_outbound_streams;
if num_in_progress_outbound_substreams < MAX_NUM_SUBSTREAMS {
if let Some(protocol) = self.requested_streams.pop_front() {
self.num_requested_outbound_streams += 1;
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol });
}
}

if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() {
// We destroyed all substreams in this function.
self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout);
Expand Down Expand Up @@ -853,12 +867,6 @@ where

loop {
match std::mem::replace(this, OutboundSubstreamState::Poisoned) {
OutboundSubstreamState::PendingOpen(protocol) => {
*this = OutboundSubstreamState::Done;
return Poll::Ready(Some(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol,
}));
}
OutboundSubstreamState::PendingSend(mut substream, msg, user_data) => {
match substream.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => match substream.start_send_unpin(msg) {
Expand Down

0 comments on commit 520523b

Please sign in to comment.