Skip to content

Commit

Permalink
fix: Mitigate issue MystenLabs#706 (MystenLabs#715)
Browse files Browse the repository at this point in the history
Context:
The `BatchLoader` is establishing a primary->worker communication using a public address, which is adding latency and competing with outbound traffic.
The proper fix us to use the `BlockWaiter`.

The issue:
We would like a mitigation to be deployed and effective sooner.

The fix:
We inspect the worker addresses used by the `BatchLoader`, and rewrite their hostname to localhost when it matches the local primary.
  • Loading branch information
huitseeker authored Aug 8, 2022
1 parent d764061 commit 04a9081
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 4 deletions.
1 change: 1 addition & 0 deletions narwhal/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev =

store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "d965a5a795dcdb4d1c7964acf556bc249fdc58aa" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
match_opt = "0.1.2"

[dev-dependencies]
indexmap = { version = "1.9.1", features = ["serde"] }
Expand Down
110 changes: 106 additions & 4 deletions narwhal/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod fixtures;
mod execution_state;

pub use errors::{ExecutionStateError, SubscriberError, SubscriberResult};
use multiaddr::{Multiaddr, Protocol};
pub use state::ExecutionIndices;

use crate::{batch_loader::BatchLoader, core::Core, subscriber::Subscriber};
Expand All @@ -23,7 +24,13 @@ use config::SharedCommittee;
use consensus::ConsensusOutput;
use crypto::PublicKey;
use serde::de::DeserializeOwned;
use std::{fmt::Debug, sync::Arc};
use std::{
borrow::Cow,
collections::HashMap,
fmt::Debug,
net::{Ipv4Addr, Ipv6Addr},
sync::Arc,
};
use store::Store;
use tokio::{
sync::{
Expand Down Expand Up @@ -131,17 +138,41 @@ impl Executor {
);

// Spawn the batch loader.
let worker_addresses = committee
let mut worker_addresses: HashMap<u32, Multiaddr> = committee
.load()
.authorities
.iter()
.find(|(x, _)| *x == &name)
.map(|(_, authority)| authority)
.find_map(|v| match_opt::match_opt!(v, (x, authority) if *x == name => authority))
.expect("Our public key is not in the committee")
.workers
.iter()
.map(|(id, x)| (*id, x.worker_to_worker.clone()))
.collect();
////////////////////////////////////////////////////////////////
// TODO: remove this hack once #706 is fixed
////////////////////////////////////////////////////////////////

// retrieve our primary address
let our_primary_to_primary_address = committee
.load()
.primary(&name)
.expect("Out public key is not in the committee!")
.primary_to_primary;
// extract the hostname portion
let our_primary_hostname = our_primary_to_primary_address
.into_iter()
.flat_map(move |x| match x {
p @ Protocol::Ip4(_) | p @ Protocol::Ip6(_) | p @ Protocol::Dns(_) => Some(p),
_ => None,
})
.next()
.expect("Could not find hostname in our primary address!");
// Modify the worker addresses that we are about to use : would we talk better using a loopback address?
for worker_address in worker_addresses.values_mut() {
replace_distant_by_localhost(worker_address, &our_primary_hostname);
}
////////////////////////////////////////////////////////////////

let batch_loader_handle = BatchLoader::spawn(
store,
tx_reconfigure.subscribe(),
Expand All @@ -158,3 +189,74 @@ impl Executor {
])
}
}

fn replace_distant_by_localhost(target: &mut Multiaddr, hostname_pattern: &Protocol) {
// does the hostname match our pattern exactly?
if target.iter().next() == Some(hostname_pattern.clone()) {
if let Some(replacement) = target.replace(0, move |x| match x {
Protocol::Ip4(_) => Some(Protocol::Ip4(Ipv4Addr::LOCALHOST)),
Protocol::Ip6(_) => Some(Protocol::Ip6(Ipv6Addr::LOCALHOST)),
Protocol::Dns(_) => Some(Protocol::Dns(Cow::Owned("localhost".to_owned()))),
_ => None,
}) {
tracing::debug!("Address for worker {} replaced by {}", target, replacement);
*target = replacement;
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use multiaddr::{multiaddr, Protocol};
use std::net::Ipv4Addr;

#[test]
fn test_replace_distant_by_localhost() {
// IPV4 positive
let non_local: Ipv4Addr = "8.8.8.8".parse().unwrap();
let mut addr1 = multiaddr!(Ip4(non_local), Tcp(10000u16));

replace_distant_by_localhost(&mut addr1, &Protocol::Ip4(non_local));
assert_eq!(addr1, multiaddr!(Ip4(Ipv4Addr::LOCALHOST), Tcp(10000u16)));

// IPV4 negative
let other_target: Ipv4Addr = "8.8.8.4".parse().unwrap();
let addr1 = multiaddr!(Ip4(non_local), Tcp(10000u16));
let mut addr2 = multiaddr!(Ip4(non_local), Tcp(10000u16));

replace_distant_by_localhost(&mut addr2, &Protocol::Ip4(other_target));
assert_eq!(addr2, addr1);

// IPV6 positive
let non_local: Ipv6Addr = "2607:f0d0:1002:51::4".parse().unwrap();
let mut addr1 = multiaddr!(Ip6(non_local), Tcp(10000u16));

replace_distant_by_localhost(&mut addr1, &Protocol::Ip6(non_local));
assert_eq!(addr1, multiaddr!(Ip6(Ipv6Addr::LOCALHOST), Tcp(10000u16)));

// IPV6 negative
let other_target: Ipv6Addr = "2607:f0d0:1002:50::4".parse().unwrap();
let addr1 = multiaddr!(Ip6(non_local), Tcp(10000u16));
let mut addr2 = multiaddr!(Ip6(non_local), Tcp(10000u16));

replace_distant_by_localhost(&mut addr2, &Protocol::Ip6(other_target));
assert_eq!(addr2, addr1);

// DNS positive
let non_local: Cow<str> = Cow::Owned("google.com".to_owned());
let mut addr1 = multiaddr!(Dns(non_local.clone()), Tcp(10000u16));

replace_distant_by_localhost(&mut addr1, &Protocol::Dns(non_local.clone()));
let localhost: Cow<str> = Cow::Owned("localhost".to_owned());
assert_eq!(addr1, multiaddr!(Dns(localhost), Tcp(10000u16)));

// DNS negative
let other_target: Cow<str> = Cow::Owned("apple.com".to_owned());
let addr1 = multiaddr!(Dns(non_local.clone()), Tcp(10000u16));
let mut addr2 = multiaddr!(Dns(non_local), Tcp(10000u16));

replace_distant_by_localhost(&mut addr2, &Protocol::Dns(other_target));
assert_eq!(addr2, addr1);
}
}

0 comments on commit 04a9081

Please sign in to comment.