Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Apr 4, 2024
1 parent 4aff583 commit 9df01e3
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 39 deletions.
7 changes: 4 additions & 3 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ where
let mut network_manager = N::new(context.clone());
let network_client = network_manager.client();

// REQUIRED: Broadcaster must be created before Core, to start listen on block broadcasts.
// REQUIRED: Broadcaster must be created before Core, to start listening on the
// broadcast channel in order to not miss blocks.
let broadcaster = if N::Client::SUPPORT_STREAMING {
None
} else {
Expand Down Expand Up @@ -318,7 +319,7 @@ mod tests {
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
error::ConsensusResult,
network::{BlockStream, NetworkClient},
network::{BlockStream, NetworkClient, NetworkService as _},
storage::mem_store::MemStore,
transaction::NoopTransactionVerifier,
};
Expand Down Expand Up @@ -475,7 +476,7 @@ mod tests {
let serialized = input_block.serialized().clone();
tokio::spawn(async move {
service
.handle_received_block(context.committee.to_authority_index(0).unwrap(), serialized)
.handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
.await
.unwrap();
});
Expand Down
35 changes: 20 additions & 15 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
dag_state,
}
}
}

/// Handling the block sent from the peer via either unicast RPC or subscription stream.
pub(crate) async fn handle_received_block(
#[async_trait]
impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
async fn handle_send_block(
&self,
peer: AuthorityIndex,
serialized_block: Bytes,
Expand All @@ -78,14 +80,16 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
info!("Block with wrong authority from {}: {}", peer, e);
return Err(e);
}
// Specified peer can be trusted to be a valid authority index.
let peer_hostname = self.context.committee.authority(peer).hostname.clone();

// Reject blocks failing validations.
if let Err(e) = self.block_verifier.verify(&signed_block) {
self.context
.metrics
.node_metrics
.invalid_blocks
.with_label_values(&[&peer.to_string(), "send_block"])
.with_label_values(&[&peer_hostname, "send_block"])
.inc();
info!("Invalid block from {}: {}", peer, e);
return Err(e);
Expand All @@ -99,6 +103,12 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
.saturating_sub(timestamp_utc_ms()),
);
if forward_time_drift > self.context.parameters.max_forward_time_drift {
self.context
.metrics
.node_metrics
.rejected_future_blocks
.with_label_values(&[&peer_hostname])
.inc();
return Err(ConsensusError::BlockTooFarInFuture {
block_timestamp: verified_block.timestamp_ms(),
forward_time_drift,
Expand All @@ -116,12 +126,18 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
sleep(forward_time_drift).await;
}

self.context
.metrics
.node_metrics
.verified_blocks
.with_label_values(&[&peer_hostname])
.inc();

let missing_ancestors = self
.core_dispatcher
.add_blocks(vec![verified_block])
.await
.map_err(|_| ConsensusError::Shutdown)?;

if !missing_ancestors.is_empty() {
// schedule the fetching of them from this peer
if let Err(err) = self
Expand All @@ -135,17 +151,6 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {

Ok(())
}
}

#[async_trait]
impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
async fn handle_send_block(
&self,
peer: AuthorityIndex,
serialized_block: Bytes,
) -> ConsensusResult<()> {
self.handle_received_block(peer, serialized_block).await
}

async fn handle_subscribe_blocks(
&self,
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ impl DagState {

/// Returns cached recent blocks from the specified authority.
/// Blocks returned is limited by both the `start` round, and if the blocks are cached.
/// NOTE: caller should not assume returned blocks are always chained.
pub(crate) fn get_cached_blocks(
&self,
authority: AuthorityIndex,
Expand Down
28 changes: 28 additions & 0 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ pub(crate) struct NodeMetrics {
pub fetch_blocks_scheduler_inflight: IntGauge,
pub fetched_blocks: IntCounterVec,
pub invalid_blocks: IntCounterVec,
pub rejected_future_blocks: IntCounterVec,
pub verified_blocks: IntCounterVec,
pub committed_leaders_total: IntCounterVec,
pub last_committed_leader_round: IntGauge,
pub commit_round_advancement_interval: Histogram,
Expand All @@ -100,6 +102,8 @@ pub(crate) struct NodeMetrics {
pub suspended_blocks: IntCounterVec,
pub threshold_clock_round: IntGauge,
pub unsuspended_blocks: IntCounterVec,
pub subscriber_connection_attempts: IntCounterVec,
pub subscriber_connections: IntGaugeVec,
pub uptime: Histogram,
}

Expand Down Expand Up @@ -189,6 +193,18 @@ impl NodeMetrics {
&["authority", "source"],
registry,
).unwrap(),
rejected_future_blocks: register_int_counter_vec_with_registry!(
"rejected_future_blocks",
"Number of blocks rejected because their timestamp is too far in the future",
&["authority"],
registry,
).unwrap(),
verified_blocks: register_int_counter_vec_with_registry!(
"verified_blocks",
"Number of blocks received from each peer that are verified",
&["authority"],
registry,
).unwrap(),
committed_leaders_total: register_int_counter_vec_with_registry!(
"committed_leaders_total",
"Total number of (direct or indirect) committed leaders per authority",
Expand Down Expand Up @@ -255,6 +271,18 @@ impl NodeMetrics {
&["authority"],
registry,
).unwrap(),
subscriber_connection_attempts: register_int_counter_vec_with_registry!(
"subscriber_connection_attempts",
"The number of connection attempts per peer",
&["authority", "status"],
registry,
).unwrap(),
subscriber_connections: register_int_gauge_vec_with_registry!(
"subscriber_connections",
"The number of block stream connections breaking down by peer",
&["authority"],
registry,
).unwrap(),
uptime: register_histogram_with_registry!(
"uptime",
"Total node uptime",
Expand Down
10 changes: 10 additions & 0 deletions consensus/core/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,22 @@ pub(crate) trait NetworkClient: Send + Sync + 'static {
/// of `anemo_gen::ConsensusRpc`, which itself is annotated with `async_trait`.
#[async_trait]
pub(crate) trait NetworkService: Send + Sync + 'static {
/// Handles the block sent from the peer via either unicast RPC or subscription stream.
/// Peer value can be trusted to be a valid authority index.
/// But serialized_block must be verified before its contents are trusted.
async fn handle_send_block(&self, peer: AuthorityIndex, block: Bytes) -> ConsensusResult<()>;

/// Handles the subscription request from the peer.
/// A stream of newly proposed blocks is returned to the peer.
/// The stream continues until the end of epoch, peer unsubscribes, or a network error / crash
/// occurs.
async fn handle_subscribe_blocks(
&self,
peer: AuthorityIndex,
last_received: Round,
) -> ConsensusResult<BlockStream>;

/// Handles the request to fetch blocks by references from the peer.
async fn handle_fetch_blocks(
&self,
peer: AuthorityIndex,
Expand Down
77 changes: 56 additions & 21 deletions consensus/core/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::StreamExt;
use mysten_metrics::spawn_monitored_task;
use parking_lot::{Mutex, RwLock};
use tokio::{task::JoinHandle, time::sleep};
use tracing::{debug, info};
use tracing::{debug, error, info};

use crate::{
block::BlockAPI as _,
Expand All @@ -19,9 +19,9 @@ use crate::{
};

/// Subscriber manages the block stream subscriptions to other peers, taking care of retrying
/// if a subscription stream fails. Blocks returned from the peer are sent to authority service
/// for processing.
/// Currently individual subscription management is not exposed, but it could become
/// when subscription streams break. Blocks returned from the peer are sent to the authority
/// service for processing.
/// Currently subscription management for individual peer is not exposed, but it could become
/// useful in future.
pub(crate) struct Subscriber<C: NetworkClient, S: NetworkService> {
context: Arc<Context>,
Expand Down Expand Up @@ -51,6 +51,10 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
}

pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
if peer == self.context.own_index {
error!("Attempt to subscribe to own validator {peer} is ignored!");
return;
}
let context = self.context.clone();
let network_client = self.network_client.clone();
let authority_service = self.authority_service.clone();
Expand All @@ -61,51 +65,70 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
.round();

let mut subscriptions = self.subscriptions.lock();
self.unsubscribe_locked(&mut subscriptions[peer.value()]);
self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
context,
network_client,
authority_service,
peer,
last_received,
)));
let peer_hostname = self.context.committee.authority(peer).hostname.clone();
self.context
.metrics
.node_metrics
.subscriber_connections
.with_label_values(&[&peer_hostname])
.inc();
}

pub(crate) fn stop(&self) {
let mut subscriptions = self.subscriptions.lock();
subscriptions.iter_mut().for_each(|subscription| {
self.unsubscribe_locked(subscription);
});
for (peer, _) in self.context.committee.authorities() {
self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
}
}

fn unsubscribe_locked(&self, subscription: &mut Option<JoinHandle<()>>) {
fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
let peer_hostname = self.context.committee.authority(peer).hostname.clone();
self.context
.metrics
.node_metrics
.subscriber_connections
.with_label_values(&[&peer_hostname])
.dec();
if let Some(subscription) = subscription.take() {
subscription.abort();
}
}

async fn subscription_loop(
_context: Arc<Context>,
context: Arc<Context>,
network_client: Arc<C>,
authority_service: Arc<S>,
peer: AuthorityIndex,
last_received: Round,
) {
const IMMEDIATE_RETRIES: i64 = 3;
const MAX_RETRY_INTERNAL: Duration = Duration::from_secs(10);
let mut retries: u64 = 0;
loop {
if retries >= 4 {
let delay = Duration::from_secs_f64(1.2f64.powf((retries - 4) as f64))
.min(MAX_RETRY_INTERNAL);
let peer_hostname = context.committee.authority(peer).hostname.clone();
let mut retries: i64 = 0;
'subscription: loop {
if retries > IMMEDIATE_RETRIES {
// When not immediately retrying, add a delay starting from 100ms and increases until 10s.
let delay = Duration::from_secs_f64(
0.1 * 1.2f64.powf((retries - IMMEDIATE_RETRIES - 1) as f64),
)
.min(MAX_RETRY_INTERNAL);
debug!(
"Waiting to subscribe to blocks from peer {} in {} seconds, retry {}",
"Delaying retry {} to subscribe to blocks from peer {} in {} seconds",
retries,
peer,
delay.as_secs_f64(),
retries
);
sleep(delay).await;
} else {
// No need for delay but yield to avoid monopolizing the thread.
// Retry immediately, but still yield to avoid monopolizing the thread.
tokio::task::yield_now().await;
}
let mut blocks = match network_client
Expand All @@ -114,15 +137,27 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
{
Ok(blocks) => {
retries = 0;
context
.metrics
.node_metrics
.subscriber_connection_attempts
.with_label_values(&[&peer_hostname, "success"])
.inc();
blocks
}
Err(e) => {
retries += 1;
context
.metrics
.node_metrics
.subscriber_connection_attempts
.with_label_values(&[&peer_hostname, "failure"])
.inc();
debug!("Failed to subscribe to blocks from peer {}: {}", peer, e);
continue;
continue 'subscription;
}
};
loop {
'stream: loop {
match blocks.next().await {
Some(block) => {
let result = authority_service
Expand All @@ -138,7 +173,7 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
None => {
retries += 1;
debug!("Subscription to blocks from peer {} ended", peer);
break;
break 'stream;
}
}
}
Expand Down

0 comments on commit 9df01e3

Please sign in to comment.