Skip to content

Commit

Permalink
restore certificates in restarted executor (#824)
Browse files Browse the repository at this point in the history
* restore certificates in restarted executor

order the restored messages by sequence before execution

pass restored to subscriber

refactor get restored consensus

* rebased on main

* emit log and metric on restored consensus output
  • Loading branch information
lanvidr authored Sep 1, 2022
1 parent 7f85511 commit 060a3dc
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 11 deletions.
8 changes: 8 additions & 0 deletions narwhal/consensus/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub struct ConsensusMetrics {
/// The number of times the consensus state was restored from the consensus store
/// following a node restart
pub recovered_consensus_state: IntCounter,
/// The number of certificates from consensus that were restored and sent to the executor
/// following a node restart
pub recovered_consensus_output: IntCounter,
}

impl ConsensusMetrics {
Expand Down Expand Up @@ -52,6 +55,11 @@ impl ConsensusMetrics {
"The number of times the consensus state was restored from the consensus store following a node restart",
registry
).unwrap(),
recovered_consensus_output: register_int_counter_with_registry!(
"recovered_consensus_output",
"The number of certificates from consensus that were restored and sent to the executor following a node restart",
registry
).unwrap(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions narwhal/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ tonic = "0.7.2"
tracing = "0.1.36"
prometheus = "0.13.1"
backoff = { version = "0.4.0", features = ["tokio"] }
storage = { path = "../storage" }
itertools = "0.10.3"

types = { path = "../types" }
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "9deac015fbf66e24b6da9699630e06750eaa094a" }
Expand Down
48 changes: 47 additions & 1 deletion narwhal/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ use primary::BlockCommand;
use prometheus::Registry;
use serde::de::DeserializeOwned;
use std::{fmt::Debug, sync::Arc};
use storage::CertificateStore;
use store::Store;
use tokio::{
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tracing::info;
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification};
use types::{
metered_channel, Batch, BatchDigest, CertificateDigest, ConsensusStore,
ReconfigureNotification, SequenceNumber,
};

/// Convenience type representing a serialized transaction.
pub type SerializedTransaction = Vec<u8>;
Expand Down Expand Up @@ -97,6 +101,7 @@ impl Executor {
tx_output: Sender<ExecutorOutput<State>>,
tx_get_block_commands: metered_channel::Sender<BlockCommand>,
registry: &Registry,
restored_consensus_output: Vec<ConsensusOutput>,
) -> SubscriberResult<Vec<JoinHandle<()>>>
where
State: ExecutionState + Send + Sync + 'static,
Expand Down Expand Up @@ -125,6 +130,7 @@ impl Executor {
rx_consensus,
tx_executor,
arc_metrics,
restored_consensus_output,
);

// Spawn the executor's core.
Expand All @@ -142,3 +148,43 @@ impl Executor {
Ok(vec![subscriber_handle, executor_handle])
}
}

pub async fn get_restored_consensus_output<State>(
consensus_store: Arc<ConsensusStore>,
certificate_store: CertificateStore,
execution_state: Arc<State>,
) -> Result<Vec<ConsensusOutput>, SubscriberError>
where
State: ExecutionState + Send + Sync + 'static,
State::Error: Debug,
{
let mut restored_consensus_output = Vec::new();
let consensus_next_index = consensus_store
.read_last_consensus_index()
.map_err(SubscriberError::StoreError)?;

let next_cert_index = execution_state
.load_execution_indices()
.await?
.next_certificate_index;

if next_cert_index < consensus_next_index {
let missing = consensus_store
.read_sequenced_certificates(&(next_cert_index..=consensus_next_index - 1))?
.iter()
.zip(next_cert_index..consensus_next_index)
.filter_map(|(c, seq)| c.map(|digest| (digest, seq)))
.collect::<Vec<(CertificateDigest, SequenceNumber)>>();

for (cert_digest, seq) in missing {
if let Some(cert) = certificate_store.read(cert_digest).unwrap() {
// Save the missing sequence / cert pair as ConsensusOutput to re-send to the executor.
restored_consensus_output.push(ConsensusOutput {
certificate: cert,
consensus_index: seq,
})
}
}
}
Ok(restored_consensus_output)
}
20 changes: 18 additions & 2 deletions narwhal/executor/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl Subscriber {
rx_consensus: metered_channel::Receiver<ConsensusOutput>,
tx_executor: metered_channel::Sender<ConsensusOutput>,
metrics: Arc<ExecutorMetrics>,
restored_consensus_output: Vec<ConsensusOutput>,
) -> JoinHandle<()> {
let get_block_retry_policy = ExponentialBackoff {
initial_interval: Duration::from_millis(500),
Expand All @@ -80,14 +81,17 @@ impl Subscriber {
get_block_retry_policy,
metrics,
}
.run()
.run(restored_consensus_output)
.await
.expect("Failed to run subscriber")
})
}

/// Main loop connecting to the consensus to listen to sequence messages.
async fn run(&mut self) -> SubscriberResult<()> {
async fn run(
&mut self,
restored_consensus_output: Vec<ConsensusOutput>,
) -> SubscriberResult<()> {
// It's important to have the futures in ordered fashion as we want
// to guarantee that will deliver to the executor the certificates
// in the same order we received from rx_consensus. So it doesn't
Expand All @@ -97,6 +101,18 @@ impl Subscriber {
let mut waiting =
BoundedFuturesOrdered::with_capacity(Self::MAX_PENDING_CONSENSUS_MESSAGES);

// First handle any consensus output messages that were restored due to a restart.
// This needs to happen before we start listening on rx_consensus and receive messages sequenced after these.
for message in restored_consensus_output {
let future = Self::wait_on_payload(
self.get_block_retry_policy.clone(),
self.store.clone(),
self.tx_get_block_commands.clone(),
message,
);
waiting.push(future).await;
}

// Listen to sequenced consensus message and process them.
loop {
tokio::select! {
Expand Down
1 change: 0 additions & 1 deletion narwhal/executor/src/tests/executor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
execution_state::{TestState, KILLER_TRANSACTION, MALFORMED_TRANSACTION},
fixtures::{test_batch, test_certificate, test_store, test_u64_certificates},
};

use std::sync::Arc;
use test_utils::committee;
use tokio::sync::mpsc::channel;
Expand Down
63 changes: 60 additions & 3 deletions narwhal/executor/src/tests/subscriber_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async fn spawn_subscriber(
rx_sequence: metered_channel::Receiver<ConsensusOutput>,
tx_executor: metered_channel::Sender<ConsensusOutput>,
tx_get_block_commands: metered_channel::Sender<BlockCommand>,
restored_consensus_output: Vec<ConsensusOutput>,
) -> (
Store<BatchDigest, Batch>,
watch::Sender<ReconfigureNotification>,
Expand All @@ -33,6 +34,7 @@ async fn spawn_subscriber(
rx_sequence,
tx_executor,
Arc::new(executor_metrics),
restored_consensus_output,
);

(store, tx_reconfigure, subscriber_handle)
Expand All @@ -46,7 +48,7 @@ async fn handle_certificate_with_downloaded_batch() {

// Spawn a subscriber.
let (store, _tx_reconfigure, _) =
spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command).await;
spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command, vec![]).await;

let total_certificates = 2;
let certificates = test_u64_certificates(
Expand Down Expand Up @@ -108,7 +110,7 @@ async fn should_retry_when_failed_to_get_payload() {

// Spawn a subscriber.
let (store, _tx_reconfigure, _) =
spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command).await;
spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command, vec![]).await;

// Create a certificate
let total_certificates = 1;
Expand Down Expand Up @@ -181,7 +183,7 @@ async fn subscriber_should_crash_when_irrecoverable_error() {

// Spawn a subscriber.
let (_store, _tx_reconfigure, handle) =
spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command).await;
spawn_subscriber(rx_sequence, tx_executor, tx_get_block_command, vec![]).await;

// Create a certificate
let total_certificates = 1;
Expand Down Expand Up @@ -212,6 +214,61 @@ async fn subscriber_should_crash_when_irrecoverable_error() {
assert!(err.is_panic());
}

#[tokio::test]
async fn test_subscriber_with_restored_consensus_output() {
let (_tx_sequence, rx_sequence) = test_channel!(10);
let (tx_executor, mut rx_executor) = test_channel!(10);
let (tx_get_block_command, mut rx_get_block_command) = test_utils::test_get_block_commands!(1);

// Create restored consensus output
let total_certificates = 2;
let certificates = test_u64_certificates(
total_certificates,
/* batches_per_certificate */ 2,
/* transactions_per_batch */ 2,
);
let restored_consensus = certificates
.clone()
.into_iter()
.enumerate()
.map(|(i, (certificate, _))| ConsensusOutput {
certificate,
consensus_index: i as SequenceNumber,
})
.collect();

// Spawn a subscriber.
let (_store, _tx_reconfigure, _handle) = spawn_subscriber(
rx_sequence,
tx_executor,
tx_get_block_command,
restored_consensus,
)
.await;

for i in 0..total_certificates {
let request = rx_get_block_command.recv().await.unwrap();

let _batches = match request {
BlockCommand::GetBlock { id, sender } => {
let (_certificate, batches) = certificates.get(i).unwrap().to_owned();

// Mimic the block_waiter here and respond with the payload back
let ok = successful_block_response(id, batches.clone());

sender.send(ok).unwrap();

batches
}
_ => panic!("Unexpected command received"),
};

// Ensure restored messages are delivered.
let output = rx_executor.recv().await.unwrap();
assert_eq!(output.consensus_index, i as SequenceNumber);
}
}

// Helper method to create a successful (OK) get_block response.
fn successful_block_response(
id: CertificateDigest,
Expand Down
1 change: 1 addition & 0 deletions narwhal/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tracing-log = "0.1.3"
tracing-subscriber = { version = "0.3.15", features = ["time", "env-filter"] }
url = "2.2.2"
axum = "0.5.15"
itertools = "0.10.3"

config = { path = "../config" }
consensus = { path = "../consensus" }
Expand Down
34 changes: 31 additions & 3 deletions narwhal/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ use consensus::{
bullshark::Bullshark,
dag::Dag,
metrics::{ChannelMetrics, ConsensusMetrics},
Consensus,
Consensus, ConsensusOutput,
};

use crypto::{KeyPair, PublicKey};
use executor::{ExecutionState, Executor, ExecutorOutput, SerializedTransaction, SubscriberResult};
use executor::{
get_restored_consensus_output, ExecutionState, Executor, ExecutorOutput, SerializedTransaction,
SubscriberResult,
};
use fastcrypto::traits::{KeyPair as _, VerifyingKey};
use itertools::Itertools;
use primary::{BlockCommand, NetworkModel, PayloadToken, Primary, PrimaryChannelMetrics};
use prometheus::{IntGauge, Registry};
use std::{fmt::Debug, sync::Arc};
Expand All @@ -23,7 +28,7 @@ use tokio::{
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tracing::debug;
use tracing::{debug, info};
use types::{
metered_channel, Batch, BatchDigest, Certificate, CertificateDigest, ConsensusStore, Header,
HeaderDigest, ReconfigureNotification, Round, SequenceNumber, SerializedBatchMessage,
Expand Down Expand Up @@ -292,6 +297,28 @@ impl Node {
let (tx_sequence, rx_sequence) =
metered_channel::channel(Self::CHANNEL_CAPACITY, &channel_metrics.tx_sequence);

// Check for any certs that have been sent by consensus but were not processed by the executor.
let restored_consensus_output = get_restored_consensus_output(
store.consensus_store.clone(),
store.certificate_store.clone(),
execution_state.clone(),
)
.await?
.into_iter()
.sorted_by(|a, b| a.consensus_index.cmp(&b.consensus_index))
.collect::<Vec<ConsensusOutput>>();

let len_restored = restored_consensus_output.len() as u64;
if len_restored > 0 {
info!(
"Consensus output on its way to the executor was restored for {} certificates",
len_restored
);
}
consensus_metrics
.recovered_consensus_output
.inc_by(len_restored);

// Spawn the consensus core who only sequences transactions.
let ordering_engine = Bullshark::new(
(**committee.load()).clone(),
Expand Down Expand Up @@ -321,6 +348,7 @@ impl Node {
/* tx_output */ tx_confirmation,
tx_get_block_commands,
registry,
restored_consensus_output,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion narwhal/test_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ use futures::Stream;
use indexmap::IndexMap;
use multiaddr::Multiaddr;
use rand::{rngs::StdRng, Rng, SeedableRng as _};
use std::sync::Arc;
use std::{
collections::{BTreeSet, VecDeque},
ops::RangeInclusive,
pin::Pin,
sync::Arc,
};
use store::{reopen, rocks, rocks::DBMap, Store};
use tokio::sync::mpsc::{channel, Receiver, Sender};
Expand Down

0 comments on commit 060a3dc

Please sign in to comment.