Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

security: Rate limit GetAddr responses #7955

Merged
merged 26 commits into from
Nov 21, 2023
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
969819f
Updates ADDR_RESPONSE_LIMIT_DENOMINATOR to 4
arya2 Nov 16, 2023
ba0cfc6
Moves logic getting a fraction of Zebra's peers to a method in the ad…
arya2 Nov 16, 2023
70e0cc1
Adds and uses CachedPeerAddrs struct in inbound service
arya2 Nov 16, 2023
81cf5a5
moves and documents constant
arya2 Nov 16, 2023
f28c529
fixes test
arya2 Nov 16, 2023
a82a404
Apply suggestions from code review
arya2 Nov 17, 2023
78234ba
updates docs
arya2 Nov 17, 2023
a5d6b25
renames sanitized_window method
arya2 Nov 17, 2023
a431e1b
renames CachedPeerAddrs to CachedPeerAddrResponse
arya2 Nov 17, 2023
66e1cfc
updates test
arya2 Nov 17, 2023
788c620
moves try_refresh to per request
arya2 Nov 17, 2023
29f3aed
Make unused sanitization method pub(crate)
teor2345 Nov 19, 2023
2032361
Apply suggestions from code review
arya2 Nov 20, 2023
f2c2968
moves CachedPeerAddrResponse to a module
arya2 Nov 20, 2023
2ee9a20
updates unit test
arya2 Nov 20, 2023
4d9bc0f
fixes unit test
arya2 Nov 20, 2023
7c35da6
removes unnecessary condition
arya2 Nov 20, 2023
cca2857
clears cached getaddr response if it can't refresh for over a minute …
arya2 Nov 20, 2023
7c84398
tests that inbound service gives out the same addresses for every Pee…
arya2 Nov 20, 2023
f72a52b
Applies suggestion from code review
arya2 Nov 20, 2023
5327d17
fixes doc link
arya2 Nov 20, 2023
1e3fb0e
renames constant
arya2 Nov 20, 2023
a4a8963
Fix docs on new constant
arya2 Nov 20, 2023
f902adb
applies suggestion from code review
arya2 Nov 20, 2023
120b6a5
uses longer cache expiry time
arya2 Nov 20, 2023
48f419e
Adds code comments
arya2 Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adds and uses CachedPeerAddrs struct in inbound service
  • Loading branch information
arya2 committed Nov 20, 2023
commit 70e0cc1f2e4377c569a5407f2f9e171c8337499b
153 changes: 93 additions & 60 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,15 @@ use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::{Arc, TryLockError},
sync::{Arc, Mutex, TryLockError},
task::{Context, Poll},
time::Duration,
time::{Duration, Instant},
};

use chrono::Utc;
use futures::{
future::{FutureExt, TryFutureExt},
stream::Stream,
};
use num_integer::div_ceil;
use tokio::sync::oneshot::{self, error::TryRecvError};
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};

Expand All @@ -32,11 +30,9 @@ use zebra_chain::{
transaction::UnminedTxId,
};
use zebra_consensus::router::RouterError;
use zebra_network::{
constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
AddressBook, InventoryResponse,
};
use zebra_network::{AddressBook, InventoryResponse};
use zebra_node_services::mempool;
use zn::types::MetaAddr;

use crate::BoxError;

Expand Down Expand Up @@ -111,6 +107,75 @@ pub struct InboundSetupData {
pub latest_chain_tip: zs::LatestChainTip,
}

const INBOUND_CACHED_ADDRS_REFRESH_INTERVAL: Duration = Duration::from_secs(10 * 60);

/// Caches and refreshes a partial list of peer addresses to be returned as a `GetAddr` response.
pub struct CachedPeerAddrs {
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
/// A shared list of peer addresses.
address_book: Arc<Mutex<zn::AddressBook>>,

/// An owned list of peer addresses used as a `GetAddr` response.
cached_addrs: Vec<MetaAddr>,

/// Instant after which `cached_addrs` should be refreshed.
refresh_time: Instant,
}

impl CachedPeerAddrs {
/// Creates a new [`CachedPeerAddrs`].
fn new(address_book: Arc<Mutex<AddressBook>>) -> Self {
let cached_addrs = address_book
.lock()
.expect("previous thread panicked while holding the address book lock")
.sanitized_window();

let refresh_time = Instant::now() + INBOUND_CACHED_ADDRS_REFRESH_INTERVAL;

Self {
address_book,
cached_addrs,
refresh_time,
}
}

/// Refreshes the `cached_addrs` if the time has past `refresh_time`
fn try_refresh(&mut self) {
let now = Instant::now();

// return early if the cached addresses are still fresh
if now < self.refresh_time {
return;
}
arya2 marked this conversation as resolved.
Show resolved Hide resolved

// try getting a lock on the address book if it's time to refresh the cached addresses
match self.address_book.try_lock() {
Ok(address_book) => {
self.cached_addrs = address_book.sanitized_window();
self.refresh_time = now + INBOUND_CACHED_ADDRS_REFRESH_INTERVAL;
}

Err(TryLockError::WouldBlock) => {
let next_refresh_time = self.refresh_time + INBOUND_CACHED_ADDRS_REFRESH_INTERVAL;

if now > next_refresh_time {
warn!("getaddrs response hasn't been refreshed in some time");
};
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
}
Err(TryLockError::Poisoned(_)) => {
panic!("previous thread panicked while holding the address book lock")
}
};
}
}

impl std::ops::Deref for CachedPeerAddrs {
type Target = Vec<MetaAddr>;

fn deref(&self) -> &Self::Target {
&self.cached_addrs
}
}
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

/// Tracks the internal state of the [`Inbound`] service during setup.
pub enum Setup {
/// Waiting for service setup to complete.
Expand All @@ -135,8 +200,12 @@ pub enum Setup {
Initialized {
// Services
//
/// A shared list of peer addresses.
address_book: Arc<std::sync::Mutex<zn::AddressBook>>,
/// An owned partial list of peer addresses used as a `GetAddr` response, and
/// a shared list of peer addresses used to periodically refresh the partial list.
///
/// Refreshed from the address book in `poll_ready` method
/// after [`INBOUND_CACHED_ADDRS_REFRESH_INTERVAL`].
cached_peer_addrs: CachedPeerAddrs,

/// A `futures::Stream` that downloads and verifies gossiped blocks.
block_downloads: Pin<Box<GossipedBlockDownloads>>,
Expand Down Expand Up @@ -261,6 +330,8 @@ impl Service<zn::Request> for Inbound {
latest_chain_tip,
} = setup_data;

let cached_peer_addrs = CachedPeerAddrs::new(address_book);

let block_downloads = Box::pin(BlockDownloads::new(
full_verify_concurrency_limit,
Timeout::new(block_download_peer_set, BLOCK_DOWNLOAD_TIMEOUT),
Expand All @@ -271,7 +342,7 @@ impl Service<zn::Request> for Inbound {

result = Ok(());
Setup::Initialized {
address_book,
cached_peer_addrs,
block_downloads,
mempool,
state,
Expand Down Expand Up @@ -306,7 +377,7 @@ impl Service<zn::Request> for Inbound {
}
// Clean up completed download tasks, ignoring their results
Setup::Initialized {
address_book,
mut cached_peer_addrs,
mut block_downloads,
mempool,
state,
Expand All @@ -317,11 +388,12 @@ impl Service<zn::Request> for Inbound {
// If we returned Pending here, and there were no waiting block downloads,
// then inbound requests would wait for the next block download, and hang forever.
while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {}
cached_peer_addrs.try_refresh();
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

result = Ok(());

Setup::Initialized {
address_book,
cached_peer_addrs,
block_downloads,
mempool,
state,
Expand Down Expand Up @@ -352,13 +424,13 @@ impl Service<zn::Request> for Inbound {
/// and will cause callers to disconnect from the remote peer.
#[instrument(name = "inbound", skip(self, req))]
fn call(&mut self, req: zn::Request) -> Self::Future {
let (address_book, block_downloads, mempool, state) = match &mut self.setup {
let (cached_peer_addrs, block_downloads, mempool, state) = match &mut self.setup {
Setup::Initialized {
address_book,
cached_peer_addrs,
block_downloads,
mempool,
state,
} => (address_book, block_downloads, mempool, state),
} => (cached_peer_addrs, block_downloads, mempool, state),
_ => {
debug!("ignoring request from remote peer during setup");
return async { Ok(zn::Response::Nil) }.boxed();
Expand All @@ -377,53 +449,14 @@ impl Service<zn::Request> for Inbound {
//
// If the address book is busy, try again inside the future. If it can't be locked
// twice, ignore the request.
let address_book = address_book.clone();

let get_peers = move || match address_book.try_lock() {
Ok(address_book) => Some(address_book.clone()),
Err(TryLockError::WouldBlock) => None,
Err(TryLockError::Poisoned(_)) => panic!("previous thread panicked while holding the address book lock"),
};

let peers = get_peers();
let peers = cached_peer_addrs.clone();

async move {
// Correctness: get the current time inside the future.
//
// This time is used to filter outdated peers, so it doesn't matter much
// if we get it when the future is created, or when it starts running.
let now = Utc::now();

// If we didn't get the peers when the future was created, wait for other tasks
// to run, then try again when the future first runs.
if peers.is_none() {
tokio::task::yield_now().await;
}
let peers = peers.or_else(get_peers);
let is_busy = peers.is_none();

// Send a sanitized response
let mut peers = peers.map_or_else(Vec::new, |peers| peers.sanitized(now));

// Truncate the list
let address_limit = div_ceil(peers.len(), ADDR_RESPONSE_LIMIT_DENOMINATOR);
let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit);
peers.truncate(address_limit);

if peers.is_empty() {
// Sometimes we don't know if the peer response will be empty until we've
// sanitized them.
if is_busy {
info!(
"ignoring `Peers` request from remote peer because our address \
book is busy"
);
} else {
debug!(
"ignoring `Peers` request from remote peer because our address \
book has no available peers"
);
}
debug!(
"ignoring `Peers` request from remote peer because our address \
book has no available peers"
arya2 marked this conversation as resolved.
Show resolved Hide resolved
);

Ok(zn::Response::Nil)
} else {
Expand Down