Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(network): Avoid initiating outbound handshakes with IPs for which Zebra already has an active peer. #7029

Merged
merged 20 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 116 additions & 3 deletions zebra-network/src/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

teor2345 marked this conversation as resolved.
Show resolved Hide resolved
use std::{
cmp::Reverse,
collections::HashMap,
iter::Extend,
net::SocketAddr,
net::{IpAddr, SocketAddr},
sync::{Arc, Mutex},
time::Instant,
};
Expand Down Expand Up @@ -72,6 +73,14 @@ pub struct AddressBook {
/// [`OrderedMap`] sorts in descending order.
by_addr: OrderedMap<PeerSocketAddr, MetaAddr, Reverse<MetaAddr>>,

/// The address with a last_connection_state of [`PeerAddrState::Responded`] and
/// the most recent `last_response` time by IP.
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
///
/// This is used to avoid initiating outbound connections past [`Config::max_connections_per_ip`](crate::config::Config), and
/// currently only supports a `max_connections_per_ip` of 1, and must be `None` when used with a greater `max_connections_per_ip`.
// TODO: Replace with `by_ip: HashMap<IpAddr, BTreeMap<DateTime32, MetaAddr>>` to support configured `max_connections_per_ip` greater than 1
most_recent_by_ip: Option<HashMap<IpAddr, MetaAddr>>,

/// The local listener address.
local_listener: SocketAddr,

Expand Down Expand Up @@ -130,7 +139,12 @@ impl AddressBook {
/// Construct an [`AddressBook`] with the given `local_listener` on `network`.
///
/// Uses the supplied [`tracing::Span`] for address book operations.
pub fn new(local_listener: SocketAddr, network: Network, span: Span) -> AddressBook {
pub fn new(
local_listener: SocketAddr,
network: Network,
max_connections_per_ip: usize,
span: Span,
) -> AddressBook {
let constructor_span = span.clone();
let _guard = constructor_span.enter();

Expand All @@ -141,6 +155,8 @@ impl AddressBook {
// and it gets replaced by `update_metrics` anyway.
let (address_metrics_tx, _address_metrics_rx) = watch::channel(AddressMetrics::default());

// Avoid initiating outbound handshakes when max_connections_per_ip is 1.
let should_limit_outbound_conns_per_ip = max_connections_per_ip == 1;
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
let mut new_book = AddressBook {
by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)),
local_listener: canonical_socket_addr(local_listener),
Expand All @@ -149,6 +165,7 @@ impl AddressBook {
span,
address_metrics_tx,
last_address_log: None,
most_recent_by_ip: should_limit_outbound_conns_per_ip.then(HashMap::new),
};

new_book.update_metrics(instant_now, chrono_now);
Expand All @@ -170,6 +187,7 @@ impl AddressBook {
pub fn new_with_addrs(
local_listener: SocketAddr,
network: Network,
max_connections_per_ip: usize,
addr_limit: usize,
span: Span,
addrs: impl IntoIterator<Item = MetaAddr>,
Expand All @@ -183,7 +201,7 @@ impl AddressBook {
// The maximum number of addresses should be always greater than 0
assert!(addr_limit > 0);

let mut new_book = AddressBook::new(local_listener, network, span);
let mut new_book = AddressBook::new(local_listener, network, max_connections_per_ip, span);
new_book.addr_limit = addr_limit;

let addrs = addrs
Expand All @@ -198,6 +216,14 @@ impl AddressBook {
for (socket_addr, meta_addr) in addrs {
// overwrite any duplicate addresses
new_book.by_addr.insert(socket_addr, meta_addr);
// Add the address to `most_recent_by_ip` if it has responded
if new_book.should_update_most_recent_by_ip(meta_addr) {
new_book
.most_recent_by_ip
.as_mut()
.expect("should be some when should_update_most_recent_by_ip is true")
.insert(socket_addr.ip(), meta_addr);
}
// exit as soon as we get enough addresses
if new_book.by_addr.len() >= addr_limit {
break;
Expand Down Expand Up @@ -314,6 +340,45 @@ impl AddressBook {
meta_addr
}

/// Returns true if `updated` needs to be applied to the recent outbound peer connection IP cache.
///
/// Checks if there are no existing entries in the address book with this IP,
/// or if `updated` has a more recent `last_response` requiring the outbound connector to wait
/// longer before initiating handshakes with peers at this IP.
///
/// This code only needs to check a single cache entry, rather than the entire address book,
/// because other code maintains these invariants:
/// - `last_response` times for an entry can only increase.
/// - this is the only field checked by `has_connection_recently_responded()`
///
/// See [`AddressBook::is_ready_for_connection_attempt_with_ip`] for more details.
fn should_update_most_recent_by_ip(&self, updated: MetaAddr) -> bool {
let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
return false
};

if let Some(previous) = most_recent_by_ip.get(&updated.addr.ip()) {
updated.last_connection_state == PeerAddrState::Responded
&& updated.last_response() > previous.last_response()
} else {
updated.last_connection_state == PeerAddrState::Responded
}
}

/// Returns true if `addr` is the latest entry for its IP, which is stored in `most_recent_by_ip`.
/// The entry is checked for an exact match to the IP and port of `addr`.
fn should_remove_most_recent_by_ip(&self, addr: PeerSocketAddr) -> bool {
let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
return false
};

if let Some(previous) = most_recent_by_ip.get(&addr.ip()) {
previous.addr == addr
} else {
false
}
}

/// Apply `change` to the address book, returning the updated `MetaAddr`,
/// if the change was valid.
///
Expand Down Expand Up @@ -373,6 +438,15 @@ impl AddressBook {

self.by_addr.insert(updated.addr, updated);

// Add the address to `most_recent_by_ip` if it sent the most recent
// response Zebra has received from this IP.
if self.should_update_most_recent_by_ip(updated) {
self.most_recent_by_ip
.as_mut()
.expect("should be some when should_update_most_recent_by_ip is true")
.insert(updated.addr.ip(), updated);
}

debug!(
?change,
?updated,
Expand All @@ -397,6 +471,15 @@ impl AddressBook {

self.by_addr.remove(&surplus_peer.addr);

// Check if this surplus peer's addr matches that in `most_recent_by_ip`
// for this the surplus peer's ip to remove it there as well.
if self.should_remove_most_recent_by_ip(surplus_peer.addr) {
self.most_recent_by_ip
.as_mut()
.expect("should be some when should_remove_most_recent_by_ip is true")
.remove(&surplus_peer.addr.ip());
}

debug!(
surplus = ?surplus_peer,
?updated,
Expand Down Expand Up @@ -435,6 +518,14 @@ impl AddressBook {
);

if let Some(entry) = self.by_addr.remove(&removed_addr) {
// Check if this surplus peer's addr matches that in `most_recent_by_ip`
// for this the surplus peer's ip to remove it there as well.
if self.should_remove_most_recent_by_ip(entry.addr) {
if let Some(most_recent_by_ip) = self.most_recent_by_ip.as_mut() {
most_recent_by_ip.remove(&entry.addr.ip());
}
}

std::mem::drop(_guard);
self.update_metrics(instant_now, chrono_now);
Some(entry)
Expand Down Expand Up @@ -463,6 +554,26 @@ impl AddressBook {
self.by_addr.descending_values().cloned()
}

/// Is this IP ready for a new outbound connection attempt?
/// Checks if the outbound connection with the most recent response at this IP has recently responded.
///
/// Note: last_response times may remain live for a long time if the local clock is changed to an earlier time.
fn is_ready_for_connection_attempt_with_ip(
&self,
ip: &IpAddr,
chrono_now: chrono::DateTime<Utc>,
) -> bool {
let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
// if we're not checking IPs, any connection is allowed
return true;
};
let Some(same_ip_peer) = most_recent_by_ip.get(ip) else {
// If there's no entry for this IP, any connection is allowed
return true;
};
!same_ip_peer.has_connection_recently_responded(chrono_now)
}

/// Return an iterator over peers that are due for a reconnection attempt,
/// in reconnection attempt order.
pub fn reconnection_peers(
Expand All @@ -478,6 +589,7 @@ impl AddressBook {
.descending_values()
.filter(move |peer| {
peer.is_ready_for_connection_attempt(instant_now, chrono_now, self.network)
&& self.is_ready_for_connection_attempt_with_ip(&peer.addr.ip(), chrono_now)
})
.cloned()
}
Expand Down Expand Up @@ -699,6 +811,7 @@ impl Clone for AddressBook {
span: self.span.clone(),
address_metrics_tx,
last_address_log: None,
most_recent_by_ip: self.most_recent_by_ip.clone(),
}
}
}
6 changes: 5 additions & 1 deletion zebra-network/src/address_book/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::Span;
use zebra_chain::{parameters::Network::*, serialization::Duration32};

use crate::{
constants::{MAX_ADDRS_IN_ADDRESS_BOOK, MAX_PEER_ACTIVE_FOR_GOSSIP},
constants::{DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK, MAX_PEER_ACTIVE_FOR_GOSSIP},
meta_addr::{arbitrary::MAX_META_ADDR, MetaAddr, MetaAddrChange},
AddressBook,
};
Expand All @@ -30,6 +30,7 @@ proptest! {
let address_book = AddressBook::new_with_addrs(
local_listener,
Mainnet,
DEFAULT_MAX_CONNS_PER_IP,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::none(),
addresses
Expand Down Expand Up @@ -59,6 +60,7 @@ proptest! {
let address_book = AddressBook::new_with_addrs(
local_listener,
Mainnet,
DEFAULT_MAX_CONNS_PER_IP,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::none(),
addresses
Expand Down Expand Up @@ -97,6 +99,7 @@ proptest! {
let mut address_book = AddressBook::new_with_addrs(
local_listener,
Mainnet,
DEFAULT_MAX_CONNS_PER_IP,
addr_limit,
Span::none(),
initial_addrs.clone(),
Expand All @@ -119,6 +122,7 @@ proptest! {
let mut address_book = AddressBook::new_with_addrs(
local_listener,
Mainnet,
DEFAULT_MAX_CONNS_PER_IP,
addr_limit,
Span::none(),
initial_addrs,
Expand Down
78 changes: 75 additions & 3 deletions zebra-network/src/address_book/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@ use zebra_chain::{
};

use crate::{
constants::MAX_ADDRS_IN_ADDRESS_BOOK, meta_addr::MetaAddr,
protocol::external::types::PeerServices, AddressBook,
constants::{DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK},
meta_addr::MetaAddr,
protocol::external::types::PeerServices,
AddressBook,
};

/// Make sure an empty address book is actually empty.
#[test]
fn address_book_empty() {
let address_book = AddressBook::new("0.0.0.0:0".parse().unwrap(), Mainnet, Span::current());
let address_book = AddressBook::new(
"0.0.0.0:0".parse().unwrap(),
Mainnet,
DEFAULT_MAX_CONNS_PER_IP,
Span::current(),
);

assert_eq!(
address_book
Expand Down Expand Up @@ -48,6 +55,7 @@ fn address_book_peer_order() {
let address_book = AddressBook::new_with_addrs(
"0.0.0.0:0".parse().unwrap(),
Mainnet,
DEFAULT_MAX_CONNS_PER_IP,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::current(),
addrs,
Expand All @@ -64,6 +72,7 @@ fn address_book_peer_order() {
let address_book = AddressBook::new_with_addrs(
"0.0.0.0:0".parse().unwrap(),
Mainnet,
DEFAULT_MAX_CONNS_PER_IP,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::current(),
addrs,
Expand All @@ -83,6 +92,7 @@ fn address_book_peer_order() {
let address_book = AddressBook::new_with_addrs(
"0.0.0.0:0".parse().unwrap(),
Mainnet,
DEFAULT_MAX_CONNS_PER_IP,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::current(),
addrs,
Expand All @@ -99,6 +109,7 @@ fn address_book_peer_order() {
let address_book = AddressBook::new_with_addrs(
"0.0.0.0:0".parse().unwrap(),
Mainnet,
DEFAULT_MAX_CONNS_PER_IP,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::current(),
addrs,
Expand All @@ -110,3 +121,64 @@ fn address_book_peer_order() {
Some(meta_addr2),
);
}

/// Check that `reconnection_peers` skips addresses with IPs for which
/// Zebra already has recently updated outbound peers.
#[test]
fn reconnection_peers_skips_recently_updated_ip() {
// tests that reconnection_peers() skips addresses where there's a connection at that IP with a recent:
// - `last_response`
test_reconnection_peers_skips_recently_updated_ip(true, |addr| {
MetaAddr::new_responded(addr, &PeerServices::NODE_NETWORK)
});

// tests that reconnection_peers() *does not* skip addresses where there's a connection at that IP with a recent:
// - `last_attempt`
test_reconnection_peers_skips_recently_updated_ip(false, MetaAddr::new_reconnect);
// - `last_failure`
test_reconnection_peers_skips_recently_updated_ip(false, |addr| {
MetaAddr::new_errored(addr, PeerServices::NODE_NETWORK)
});
}

fn test_reconnection_peers_skips_recently_updated_ip<
M: Fn(crate::PeerSocketAddr) -> crate::meta_addr::MetaAddrChange,
>(
should_skip_ip: bool,
make_meta_addr_change: M,
) {
let addr1 = "127.0.0.1:1".parse().unwrap();
let addr2 = "127.0.0.1:2".parse().unwrap();

let meta_addr1 = make_meta_addr_change(addr1).into_new_meta_addr(
Instant::now(),
Utc::now().try_into().expect("will succeed until 2038"),
);
let meta_addr2 = MetaAddr::new_gossiped_meta_addr(
addr2,
PeerServices::NODE_NETWORK,
DateTime32::MIN.saturating_add(Duration32::from_seconds(1)),
);

// The second address should be skipped because the first address has a
// recent `last_response` time and the two addresses have the same IP.
let addrs = vec![meta_addr1, meta_addr2];
let address_book = AddressBook::new_with_addrs(
"0.0.0.0:0".parse().unwrap(),
Mainnet,
DEFAULT_MAX_CONNS_PER_IP,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::current(),
addrs,
);

let next_reconnection_peer = address_book
.reconnection_peers(Instant::now(), Utc::now())
.next();

if should_skip_ip {
assert_eq!(next_reconnection_peer, None,);
} else {
assert_ne!(next_reconnection_peer, None,);
}
}
1 change: 1 addition & 0 deletions zebra-network/src/address_book_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl AddressBookUpdater {
let address_book = AddressBook::new(
local_listener,
config.network,
config.max_connections_per_ip,
span!(Level::TRACE, "address book"),
);
let address_metrics = address_book.address_metrics_watcher();
Expand Down
Loading