Skip to content

Commit

Permalink
narwhal: fix how primary and workers are notified of reconfig events
Browse files Browse the repository at this point in the history
Prior to this change we would send outside reconfig events to primaries
via the WorkerToPrimary interface. This was sort of a hack due to not
having an admin interface. Now that we have an admin interface we can
leverage that to notify primaries and then have primaries be responsible
for sending these events to all of its own workers.
  • Loading branch information
bmwill committed Oct 20, 2022
1 parent 02d5109 commit 3d7ad1e
Show file tree
Hide file tree
Showing 15 changed files with 306 additions and 383 deletions.
2 changes: 2 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ status-level = "skip"
fail-fast = false
# Retry failing tests in order to not block builds on flaky tests
retries = 5
# Timeout tests after 4 minutes
slow-timeout = { period = "60s", terminate-after = 4 }

[profile.ci.junit]
path = "junit.xml"
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 24 additions & 4 deletions narwhal/network/src/admin.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use axum::routing::post;
use axum::{extract::Extension, http::StatusCode, routing::get, Json, Router};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tokio::sync::watch;
use tracing::info;
use types::metered_channel::Sender;
use types::ReconfigureNotification;

pub fn start_admin_server(
port: u16,
network: anemo::Network,
mut rx_reconfigure: watch::Receiver<ReconfigureNotification>,
tx_state_handler: Option<Sender<ReconfigureNotification>>,
) {
let app = Router::new()
let mut router = Router::new()
.route("/peers", get(get_peers))
.route("/known_peers", get(get_known_peers))
.layer(Extension(network));
.route("/known_peers", get(get_known_peers));

// Primaries will have this service enabled
if let Some(tx_state_handler) = tx_state_handler {
let r = Router::new()
.route("/reconfigure", post(reconfigure))
.layer(Extension(tx_state_handler));
router = router.merge(r);
}

router = router.layer(Extension(network));

let socket_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
info!(
Expand All @@ -40,7 +52,7 @@ pub fn start_admin_server(
tokio::spawn(async move {
axum_server::bind(socket_address)
.handle(shutdown_handle)
.serve(app.into_make_service())
.serve(router.into_make_service())
.await
.unwrap();
});
Expand Down Expand Up @@ -70,3 +82,11 @@ async fn get_known_peers(
),
)
}

async fn reconfigure(
Extension(tx_state_handler): Extension<Sender<ReconfigureNotification>>,
Json(reconfigure_notification): Json<ReconfigureNotification>,
) -> StatusCode {
let _ = tx_state_handler.send(reconfigure_notification).await;
StatusCode::OK
}
36 changes: 2 additions & 34 deletions narwhal/network/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use types::{
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, PrimaryMessage,
PrimaryToPrimaryClient, PrimaryToWorkerClient, RequestBatchRequest, WorkerBatchMessage,
WorkerBatchRequest, WorkerBatchResponse, WorkerDeleteBatchesMessage, WorkerOthersBatchMessage,
WorkerOurBatchMessage, WorkerPrimaryMessage, WorkerReconfigureMessage,
WorkerSynchronizeMessage, WorkerToPrimaryClient, WorkerToWorkerClient,
WorkerOurBatchMessage, WorkerReconfigureMessage, WorkerSynchronizeMessage,
WorkerToPrimaryClient, WorkerToWorkerClient,
};

fn default_executor() -> BoundedExecutor {
Expand Down Expand Up @@ -298,38 +298,6 @@ impl ReliableNetwork<WorkerSynchronizeMessage> for P2pNetwork {
// Worker-to-Primary
//

impl UnreliableNetwork<WorkerPrimaryMessage> for P2pNetwork {
type Response = ();
fn unreliable_send(
&mut self,
peer: NetworkPublicKey,
message: &WorkerPrimaryMessage,
) -> Result<JoinHandle<Result<anemo::Response<()>>>> {
let message = message.to_owned();
let f =
move |peer| async move { WorkerToPrimaryClient::new(peer).send_message(message).await };
self.unreliable_send(peer, f)
}
}

#[async_trait]
impl ReliableNetwork<WorkerPrimaryMessage> for P2pNetwork {
type Response = ();
async fn send(
&mut self,
peer: NetworkPublicKey,
message: &WorkerPrimaryMessage,
) -> CancelOnDropHandler<Result<anemo::Response<()>>> {
let message = message.to_owned();
let f = move |peer| {
let message = message.clone();
async move { WorkerToPrimaryClient::new(peer).send_message(message).await }
};

self.send(peer, f).await
}
}

#[async_trait]
impl ReliableNetwork<WorkerOurBatchMessage> for P2pNetwork {
type Response = ();
Expand Down
1 change: 1 addition & 0 deletions narwhal/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ workspace-hack.workspace = true
eyre = "0.6.8"

anemo.workspace = true
reqwest = { version = "0.11.12", features = ["json"] }

[dev-dependencies]
hex = "0.4.3"
Expand Down
4 changes: 1 addition & 3 deletions narwhal/node/src/generate_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{fs::File, io::Write};
use structopt::{clap::arg_enum, StructOpt};
use types::{
Batch, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest,
ReconfigureNotification, WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerPrimaryMessage,
ReconfigureNotification, WorkerOthersBatchMessage, WorkerOurBatchMessage,
WorkerReconfigureMessage, WorkerSynchronizeMessage,
};

Expand Down Expand Up @@ -145,8 +145,6 @@ fn get_registry() -> Result<Registry> {
tracer.trace_type::<HeaderDigest>(&samples)?;
tracer.trace_type::<CertificateDigest>(&samples)?;

// The final entry points that we must document
tracer.trace_type::<WorkerPrimaryMessage>(&samples)?;
tracer.registry()
}

Expand Down
71 changes: 14 additions & 57 deletions narwhal/node/src/restarter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use crypto::{KeyPair, NetworkKeyPair};
use executor::ExecutionState;
use fastcrypto::traits::KeyPair as _;
use futures::future::join_all;
use network::{P2pNetwork, ReliableNetwork};
use prometheus::Registry;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::mpsc::Receiver;
use types::{ReconfigureNotification, WorkerPrimaryMessage, WorkerReconfigureMessage};
use types::ReconfigureNotification;

// Module to start a node (primary, workers and default consensus), keep it running, and restarting it
/// every time the committee changes.
Expand Down Expand Up @@ -44,20 +43,7 @@ impl NodeRestarter {
let mut worker_ids_and_keypairs = worker_ids_and_keypairs;
let mut committee = committee.clone();

// construct a p2p network that we can use to send reconfigure messages to our primary and
// workers. We generate a random key simply to construct the network. Also, ideally this
// would be done via a different interface.
let mut handles = Vec::new();
let network = anemo::Network::bind("127.0.0.1:0")
.server_name("narwhal")
.private_key(
NetworkKeyPair::generate(&mut rand::rngs::OsRng)
.private()
.0
.to_bytes(),
)
.start(anemo::Router::new())
.unwrap();

// Listen for new committees.
loop {
Expand Down Expand Up @@ -110,49 +96,20 @@ impl NodeRestarter {
tracing::info!("Starting reconfiguration with committee {committee}");

// Shutdown all relevant components.
// TODO: shutdown message should probably be sent in a better way than by injecting
// it through the networking stack.
let address = network::multiaddr_to_address(
&committee
.primary(&name)
.expect("Our key is not in the committee"),
)
.unwrap();
let network_key = committee
.network_key(&name)
.expect("Our key is not in the committee");
let mut primary_network =
P2pNetwork::new_for_single_address(network_key.to_owned(), address).await;
let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::Shutdown);
let primary_cancel_handle =
primary_network.send(network_key.to_owned(), &message).await;

let message = WorkerReconfigureMessage {
message: ReconfigureNotification::Shutdown,
};
let mut worker_names = Vec::new();
for worker in worker_cache
.load()
.our_workers(&name)
.expect("Our key is not in the worker cache")
{
let address = network::multiaddr_to_address(&worker.worker_address).unwrap();
let peer_id = anemo::PeerId(worker.name.0.to_bytes());
network
.connect_with_peer_id(address, peer_id)
.await
.unwrap();
worker_names.push(worker.name);
}
let worker_cancel_handles = P2pNetwork::new(network.clone())
.broadcast(worker_names, &message)
.await;

// Ensure the message has been received.
primary_cancel_handle
// Send shutdown message to the primary, who will forward it to its workers
let client = reqwest::Client::new();
client
.post(format!(
"http://127.0.0.1:{}/reconfigure",
parameters
.network_admin_server
.primary_network_admin_server_port
))
.json(&ReconfigureNotification::Shutdown)
.send()
.await
.expect("Failed to notify primary");
join_all(worker_cancel_handles).await;
.unwrap();

tracing::debug!("Committee reconfiguration message successfully sent");

// Wait for the components to shut down.
Expand Down
99 changes: 39 additions & 60 deletions narwhal/node/tests/reconfigure.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#![allow(clippy::mutable_key_type)]

use arc_swap::ArcSwap;
use bytes::Bytes;
use config::{Committee, Parameters, SharedWorkerCache, WorkerCache, WorkerId};
Expand All @@ -9,19 +12,18 @@ use executor::{ExecutionIndices, ExecutionState};
use fastcrypto::traits::KeyPair as _;
use futures::future::join_all;
use narwhal_node as node;
use network::{P2pNetwork, ReliableNetwork};
use node::{restarter::NodeRestarter, Node, NodeStorage};
use prometheus::Registry;
use std::sync::{Arc, Mutex};
use test_utils::{random_network, CommitteeFixture};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use test_utils::CommitteeFixture;
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
time::{interval, sleep, Duration, MissedTickBehavior},
};
use types::{
ReconfigureNotification, TransactionProto, TransactionsClient, WorkerPrimaryMessage,
WorkerReconfigureMessage,
};
use types::{ReconfigureNotification, TransactionProto, TransactionsClient};

/// A simple/dumb execution engine.
struct SimpleExecutionState {
Expand Down Expand Up @@ -255,11 +257,19 @@ async fn epoch_change() {
let fixture = CommitteeFixture::builder().randomize_ports(true).build();
let committee = fixture.committee();
let worker_cache = fixture.shared_worker_cache();
let parameters = Parameters {
batch_size: 200,
max_header_num_of_batches: 1,
..Parameters::default()
};
let parameters = fixture
.authorities()
.map(|a| {
(
a.public_key(),
Parameters {
batch_size: 200,
header_num_of_batches_threshold: 1, // One batch digest
..Parameters::default()
},
)
})
.collect::<HashMap<_, _>>();

// Spawn the nodes.
let mut rx_nodes = Vec::new();
Expand All @@ -281,65 +291,34 @@ async fn epoch_change() {
));

// Start a task that will broadcast the committee change signal.
let name_clone = name.clone();
let worker_cache_clone = worker_cache.clone();
let parameters_clone = parameters.get(&name).unwrap().clone();
tokio::spawn(async move {
let network = random_network();
let client = reqwest::Client::new();

while let Some((_, _, committee, _, _)) = rx_node_reconfigure.recv().await {
// TODO: shutdown message should probably be sent in a better way than by injecting
// it through the networking stack.
let address = network::multiaddr_to_address(
&committee
.primary(&name_clone)
.expect("Our key is not in the committee"),
)
.unwrap();
let network_key = committee
.network_key(&name_clone)
.expect("Our key is not in the committee");
let mut primary_network =
P2pNetwork::new_for_single_address(network_key.to_owned(), address).await;
let message = WorkerPrimaryMessage::Reconfigure(ReconfigureNotification::NewEpoch(
committee.clone(),
));
let primary_cancel_handle =
primary_network.send(network_key.to_owned(), &message).await;

let message = WorkerReconfigureMessage {
message: ReconfigureNotification::NewEpoch(committee.clone()),
};
let mut worker_names = Vec::new();
for worker in worker_cache_clone
.load()
.our_workers(&name_clone)
.expect("Our key is not in the worker cache")
{
let address = network::multiaddr_to_address(&worker.worker_address).unwrap();
let peer_id = anemo::PeerId(worker.name.0.to_bytes());
network
.connect_with_peer_id(address, peer_id)
.await
.unwrap();
worker_names.push(worker.name);
}
let worker_cancel_handles = P2pNetwork::new(network.clone())
.broadcast(worker_names, &message)
.await;

// Ensure the message has been received.
primary_cancel_handle.await.unwrap();
join_all(worker_cancel_handles).await;
let message = ReconfigureNotification::NewEpoch(committee.clone());
client
.post(format!(
"http://127.0.0.1:{}/reconfigure",
parameters_clone
.network_admin_server
.primary_network_admin_server_port
))
.json(&message)
.send()
.await
.unwrap();
}
});

let p = parameters.get(&name).unwrap().clone();
let _primary_handles = Node::spawn_primary(
a.keypair().copy(),
a.network_keypair().copy(),
Arc::new(ArcSwap::new(Arc::new(committee.clone()))),
worker_cache.clone(),
&store,
parameters.clone(),
p.clone(),
/* consensus */ true,
execution_state,
&Registry::new(),
Expand All @@ -353,7 +332,7 @@ async fn epoch_change() {
Arc::new(ArcSwap::new(Arc::new(committee.clone()))),
worker_cache.clone(),
&store,
parameters.clone(),
p,
&Registry::new(),
);

Expand Down
Loading

0 comments on commit 3d7ad1e

Please sign in to comment.