diff --git a/narwhal/primary/src/tests/rpc_tests.rs b/narwhal/primary/src/tests/rpc_tests.rs index 925725cc70331..3b28c793e44b2 100644 --- a/narwhal/primary/src/tests/rpc_tests.rs +++ b/narwhal/primary/src/tests/rpc_tests.rs @@ -17,7 +17,7 @@ async fn test_server_authorizations() { tokio::time::sleep(Duration::from_secs(3)).await; let test_authority = test_cluster.authority(0); - let test_client = test_authority.client.clone(); + let test_client = test_authority.client().await; let test_committee = test_cluster.committee.clone(); let test_worker_cache = test_cluster.worker_cache.clone(); diff --git a/narwhal/primary/tests/nodes_bootstrapping_tests.rs b/narwhal/primary/tests/nodes_bootstrapping_tests.rs index 5e2467edd09be..991cfa1e08e7e 100644 --- a/narwhal/primary/tests/nodes_bootstrapping_tests.rs +++ b/narwhal/primary/tests/nodes_bootstrapping_tests.rs @@ -104,6 +104,8 @@ async fn test_node_staggered_starts() { #[ignore] #[tokio::test] async fn test_full_outage_and_recovery() { + let _guard = setup_tracing(); + let stop_and_start_delay = Duration::from_secs(12); let node_advance_delay = Duration::from_secs(60); diff --git a/narwhal/test-utils/src/cluster.rs b/narwhal/test-utils/src/cluster.rs index 81f9dd2cd9c42..5d729a6504bf0 100644 --- a/narwhal/test-utils/src/cluster.rs +++ b/narwhal/test-utils/src/cluster.rs @@ -16,6 +16,7 @@ use prometheus::{proto::Metric, Registry}; use std::{cell::RefCell, collections::HashMap, path::PathBuf, rc::Rc, sync::Arc, time::Duration}; use storage::NodeStorage; use telemetry_subscribers::TelemetryGuards; +use tokio::sync::RwLockWriteGuard; use tokio::{ sync::{broadcast::Sender, mpsc::channel, RwLock}, task::JoinHandle, @@ -502,11 +503,11 @@ pub struct AuthorityDetails { pub id: usize, pub name: AuthorityIdentifier, pub public_key: PublicKey, - pub client: NetworkClient, internal: Arc>, } struct AuthorityDetailsInternal { + client: Option, primary: PrimaryNodeDetails, worker_keypairs: Vec, workers: HashMap, @@ -523,9 +524,6 @@ impl AuthorityDetails { committee: Committee, worker_cache: WorkerCache, ) -> Self { - // Create network client. - let client = NetworkClient::new_from_keypair(&network_key_pair); - // Create all the nodes we have in the committee let public_key = key_pair.public().clone(); let primary = PrimaryNodeDetails::new( @@ -556,6 +554,7 @@ impl AuthorityDetails { } let internal = AuthorityDetailsInternal { + client: None, primary, worker_keypairs, workers, @@ -565,11 +564,19 @@ impl AuthorityDetails { id, public_key, name, - client, internal: Arc::new(RwLock::new(internal)), } } + pub async fn client(&self) -> NetworkClient { + let internal = self.internal.read().await; + internal + .client + .as_ref() + .expect("Requested network client which has not been initialised yet") + .clone() + } + /// Starts the node's primary and workers. If the num_of_workers is provided /// then only those ones will be started. Otherwise all the available workers /// will be started instead. @@ -595,11 +602,9 @@ impl AuthorityDetails { /// start with a fresh (empty) storage. pub async fn start_primary(&self, preserve_store: bool) { let mut internal = self.internal.write().await; + let client = self.create_client(&mut internal).await; - internal - .primary - .start(self.client.clone(), preserve_store) - .await; + internal.primary.start(client, preserve_store).await; } pub async fn stop_primary(&self) { @@ -610,6 +615,8 @@ impl AuthorityDetails { pub async fn start_all_workers(&self, preserve_store: bool) { let mut internal = self.internal.write().await; + let client = self.create_client(&mut internal).await; + let worker_keypairs = internal .worker_keypairs .iter() @@ -618,9 +625,7 @@ impl AuthorityDetails { for (id, worker) in internal.workers.iter_mut() { let keypair = worker_keypairs.get(*id as usize).unwrap().copy(); - worker - .start(keypair, self.client.clone(), preserve_store) - .await; + worker.start(keypair, client.clone(), preserve_store).await; } } @@ -630,15 +635,15 @@ impl AuthorityDetails { /// start with a fresh (empty) storage. pub async fn start_worker(&self, id: WorkerId, preserve_store: bool) { let mut internal = self.internal.write().await; + let client = self.create_client(&mut internal).await; + let keypair = internal.worker_keypairs.get(id as usize).unwrap().copy(); let worker = internal .workers .get_mut(&id) .unwrap_or_else(|| panic!("Worker with id {} not found ", id)); - worker - .start(keypair, self.client.clone(), preserve_store) - .await; + worker.start(keypair, client, preserve_store).await; } pub async fn stop_worker(&self, id: WorkerId) { @@ -654,9 +659,12 @@ impl AuthorityDetails { /// Stops all the nodes (primary & workers). pub async fn stop_all(&self) { - self.client.shutdown(); + let mut internal = self.internal.write().await; + if let Some(client) = internal.client.as_ref() { + client.shutdown(); + } + internal.client = None; - let internal = self.internal.read().await; internal.primary.stop().await; for (_, worker) in internal.workers.iter() { worker.stop().await; @@ -770,6 +778,18 @@ impl AuthorityDetails { } false } + + // Creates a new network client if there isn't one yet initialised. + async fn create_client( + &self, + internal: &mut RwLockWriteGuard<'_, AuthorityDetailsInternal>, + ) -> NetworkClient { + if internal.client.is_none() { + let client = NetworkClient::new_from_keypair(&internal.primary.network_key_pair); + internal.client = Some(client); + } + internal.client.as_ref().unwrap().clone() + } } pub fn setup_tracing() -> TelemetryGuards {