Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for remote peer connections in Core recovery header fetch #6100

Merged
merged 1 commit into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions narwhal/network/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::anemo_ext::NetworkExt;
use crate::traits::{PrimaryToPrimaryRpc, PrimaryToWorkerRpc, WorkerRpc};
use crate::{
traits::{Lucky, ReliableNetwork, UnreliableNetwork},
Expand Down Expand Up @@ -241,12 +242,12 @@ impl PrimaryToPrimaryRpc for anemo::Network {
async fn get_latest_header(
&self,
peer: &NetworkPublicKey,
request: impl anemo::types::request::IntoRequest<LatestHeaderRequest> + Send,
request: LatestHeaderRequest,
) -> Result<LatestHeaderResponse> {
const LATEST_HEADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
let request = anemo::Request::new(request).with_timeout(LATEST_HEADER_REQUEST_TIMEOUT);
let peer_id = PeerId(peer.0.to_bytes());
let peer = self
.peer(peer_id)
.ok_or_else(|| format_err!("Network has no connection with peer {peer_id}"))?;
let peer = self.waiting_peer(peer_id);
let response = PrimaryToPrimaryClient::new(peer)
.get_latest_header(request)
.await
Expand Down
2 changes: 1 addition & 1 deletion narwhal/network/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub trait PrimaryToPrimaryRpc {
async fn get_latest_header(
&self,
peer: &NetworkPublicKey,
request: impl anemo::types::request::IntoRequest<LatestHeaderRequest> + Send,
request: LatestHeaderRequest,
) -> Result<LatestHeaderResponse>;
}

Expand Down
12 changes: 5 additions & 7 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use fastcrypto::{hash::Hash as _, SignatureService};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use network::{CancelOnDropHandler, P2pNetwork, PrimaryToPrimaryRpc, ReliableNetwork};
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand All @@ -40,8 +39,6 @@ use types::{
#[path = "tests/core_tests.rs"]
pub mod core_tests;

const LATEST_HEADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);

pub struct Core {
/// The public key of this primary.
name: PublicKey,
Expand Down Expand Up @@ -197,10 +194,11 @@ impl Core {

for peer in peers.iter() {
let network = self.network.network();
let request = anemo::Request::new(LatestHeaderRequest {})
.with_timeout(LATEST_HEADER_REQUEST_TIMEOUT);

header_futures.push(async move { network.get_latest_header(peer, request).await });
header_futures.push(async move {
network
.get_latest_header(peer, LatestHeaderRequest {})
.await
});
}

let mut latest_headers = Vec::new();
Expand Down
20 changes: 11 additions & 9 deletions narwhal/primary/tests/integration_tests_validator_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use types::{
};
use worker::{metrics::initialise_metrics, Worker};

#[tokio::test]
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_get_collections() {
let parameters = Parameters {
batch_size: 200, // Two transactions.
Expand Down Expand Up @@ -148,7 +148,7 @@ async fn test_get_collections() {
);

// Wait for tasks to start
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(15)).await;

// Test gRPC server with client call
let mut client = connect_to_validator_client(parameters.clone());
Expand Down Expand Up @@ -220,7 +220,7 @@ async fn test_get_collections() {
);
}

#[tokio::test]
#[tokio::test(flavor = "current_thread", start_paused = true)]
// #[cfg_attr(windows, ignore)]
#[ignore]
async fn test_remove_collections() {
Expand Down Expand Up @@ -324,7 +324,7 @@ async fn test_remove_collections() {
);

// Wait for tasks to start
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(15)).await;

// Test gRPC server with client call
let mut client = connect_to_validator_client(parameters.clone());
Expand Down Expand Up @@ -427,7 +427,7 @@ async fn test_remove_collections() {
assert_eq!(Empty {}, actual_result);
}

#[tokio::test]
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_read_causal_signed_certificates() {
let fixture = CommitteeFixture::builder().randomize_ports(true).build();
let committee = fixture.committee();
Expand Down Expand Up @@ -577,7 +577,7 @@ async fn test_read_causal_signed_certificates() {
);

// Wait for tasks to start
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(15)).await;

// Test gRPC server with client call
let mut client = connect_to_validator_client(primary_1_parameters.clone());
Expand Down Expand Up @@ -624,7 +624,7 @@ async fn test_read_causal_signed_certificates() {
assert_eq!(1, response.into_inner().collection_ids.len());
}

#[tokio::test]
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_read_causal_unsigned_certificates() {
telemetry_subscribers::init_for_testing();

Expand Down Expand Up @@ -781,7 +781,7 @@ async fn test_read_causal_unsigned_certificates() {
);

// Wait for tasks to start
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(15)).await;

// Test gRPC server with client call
let mut client = connect_to_validator_client(primary_1_parameters.clone());
Expand Down Expand Up @@ -849,6 +849,8 @@ async fn test_read_causal_unsigned_certificates() {
/// * Primary 1 be able to fetch the payload for certificates 1 & 2
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_get_collections_with_missing_certificates() {
telemetry_subscribers::init_for_testing();

// GIVEN keys for two primary nodes
let fixture = CommitteeFixture::builder().randomize_ports(true).build();
let committee = fixture.committee();
Expand Down Expand Up @@ -1019,7 +1021,7 @@ async fn test_get_collections_with_missing_certificates() {
);

// Wait for tasks to start
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(15)).await;

// Test gRPC server with client call
let mut client = connect_to_validator_client(parameters_1.clone());
Expand Down