Skip to content

Commit

Permalink
[checkpoint v2] Submit checkpoint signatures to consensus (MystenLabs…
Browse files Browse the repository at this point in the history
…#6021)

This PR signs and submit checkpoint to consensus so it can be later aggregated into a certified checkpoint.

MystenLabs#5763
  • Loading branch information
andll authored Nov 14, 2022
1 parent 4935cdb commit 0ae74f3
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1665,6 +1665,7 @@ impl AuthorityState {
&path.join("checkpoint2"),
Box::new(store.clone()),
LogCheckpointOutput::boxed(),
0,
);

// add the object_basics module
Expand Down Expand Up @@ -2323,6 +2324,14 @@ impl AuthorityState {
);
})?;
}
ConsensusTransactionKind::CheckpointSignature(data) => {
data.verify(&self.committee.load()).map_err(|err|{
warn!(
"Ignoring malformed checkpoint signature (failed to verify) from {}, sequence {}: {:?}",
transaction.consensus_output.certificate.header.author, data.summary.summary.sequence_number, err
);
})?;
}
}
Ok(VerifiedSequencedConsensusTransaction(transaction))
}
Expand Down Expand Up @@ -2430,6 +2439,9 @@ impl AuthorityState {

Ok(())
}
ConsensusTransactionKind::CheckpointSignature(info) => {
self.checkpoint_service.notify_checkpoint_signature(info)
}
}
}

Expand Down
11 changes: 9 additions & 2 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use sui_metrics::spawn_monitored_task;
use sui_types::messages_checkpoint::CheckpointRequest;
use sui_types::messages_checkpoint::CheckpointResponse;

use crate::consensus_adapter::SubmitToConsensus;
use crate::consensus_handler::ConsensusHandler;
use tracing::{debug, info, Instrument};

Expand Down Expand Up @@ -99,8 +100,13 @@ impl AuthorityServer {
consensus_address: Multiaddr,
tx_consensus_listener: Sender<ConsensusListenerMessage>,
) -> Self {
use narwhal_types::TransactionsClient;
let consensus_client = Box::new(TransactionsClient::new(
mysten_network::client::connect_lazy(&consensus_address)
.expect("Failed to connect to consensus"),
));
let consensus_adapter = ConsensusAdapter::new(
consensus_address,
consensus_client,
state.clone_committee(),
tx_consensus_listener,
Duration::from_secs(20),
Expand Down Expand Up @@ -263,6 +269,7 @@ impl ValidatorService {
/// Spawn all the subsystems run by a Sui authority: a consensus node, a sui authority server,
/// and a consensus listener bridging the consensus node and the sui authority.
pub async fn new(
consensus_client: Box<dyn SubmitToConsensus>,
config: &NodeConfig,
state: Arc<AuthorityState>,
prometheus_registry: Registry,
Expand Down Expand Up @@ -308,7 +315,7 @@ impl ValidatorService {

// The consensus adapter allows the authority to send user certificates through consensus.
let consensus_adapter = ConsensusAdapter::new(
consensus_config.address().to_owned(),
consensus_client,
state.clone_committee(),
tx_consensus_listener.clone(),
timeout,
Expand Down
42 changes: 39 additions & 3 deletions crates/sui-core/src/checkpoints2/checkpoint_output.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::authority::StableSyncAuthoritySigner;
use crate::consensus_adapter::SubmitToConsensus;
use async_trait::async_trait;
use sui_types::base_types::AuthorityName;
use sui_types::error::SuiResult;
use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSummary};
use sui_types::messages::ConsensusTransaction;
use sui_types::messages_checkpoint::{
CheckpointContents, CheckpointSignatureMessage, CheckpointSummary, SignedCheckpointSummary,
};
use tracing::{debug, info};

#[async_trait]
pub trait CheckpointOutput: Sync + Send + 'static {
fn checkpoint_created(
async fn checkpoint_created(
&self,
summary: &CheckpointSummary,
contents: &CheckpointContents,
) -> SuiResult;
}

pub struct SubmitCheckpointToConsensus<T> {
pub sender: T,
pub signer: StableSyncAuthoritySigner,
pub authority: AuthorityName,
}

pub struct LogCheckpointOutput;

impl LogCheckpointOutput {
Expand All @@ -21,8 +35,30 @@ impl LogCheckpointOutput {
}
}

#[async_trait]
impl<T: SubmitToConsensus> CheckpointOutput for SubmitCheckpointToConsensus<T> {
async fn checkpoint_created(
&self,
summary: &CheckpointSummary,
contents: &CheckpointContents,
) -> SuiResult {
LogCheckpointOutput
.checkpoint_created(summary, contents)
.await?;
let summary = SignedCheckpointSummary::new_from_summary(
summary.clone(),
self.authority,
&*self.signer,
);
let message = CheckpointSignatureMessage { summary };
let transaction = ConsensusTransaction::new_checkpoint_signature_message(message);
self.sender.submit_to_consensus(&transaction).await
}
}

#[async_trait]
impl CheckpointOutput for LogCheckpointOutput {
fn checkpoint_created(
async fn checkpoint_created(
&self,
summary: &CheckpointSummary,
contents: &CheckpointContents,
Expand Down
44 changes: 31 additions & 13 deletions crates/sui-core/src/checkpoints2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,25 @@ mod checkpoint_output;
use crate::authority::EffectsNotifyRead;
use crate::checkpoints2::casual_order::CasualOrder;
use crate::checkpoints2::checkpoint_output::CheckpointOutput;
pub use crate::checkpoints2::checkpoint_output::LogCheckpointOutput;
pub use crate::checkpoints2::checkpoint_output::{
LogCheckpointOutput, SubmitCheckpointToConsensus,
};
use futures::future::{select, Either};
use futures::FutureExt;
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use sui_metrics::spawn_monitored_task;
use sui_types::base_types::TransactionDigest;
use sui_types::base_types::{EpochId, TransactionDigest};
use sui_types::error::SuiResult;
use sui_types::gas::GasCostSummary;
use sui_types::messages::TransactionEffects;
use sui_types::messages_checkpoint::{
CheckpointContents, CheckpointSequenceNumber, CheckpointSummary,
CheckpointContents, CheckpointSequenceNumber, CheckpointSignatureMessage, CheckpointSummary,
};
use tokio::sync::{oneshot, Notify};
use tracing::{debug, error};
use tracing::{debug, error, info};
use typed_store::rocks::{DBBatch, DBMap};
use typed_store::traits::TypedStoreDebug;
use typed_store::Map;
Expand Down Expand Up @@ -60,6 +62,7 @@ pub struct CheckpointBuilder {
effects_store: Box<dyn EffectsNotifyRead>,
output: Box<dyn CheckpointOutput>,
exit: oneshot::Receiver<()>,
epoch: EpochId,
}

impl CheckpointBuilder {
Expand All @@ -69,13 +72,15 @@ impl CheckpointBuilder {
effects_store: Box<dyn EffectsNotifyRead>,
output: Box<dyn CheckpointOutput>,
exit: oneshot::Receiver<()>,
epoch: EpochId,
) -> Self {
Self {
tables,
notify,
effects_store,
output,
exit,
epoch,
}
}

Expand Down Expand Up @@ -106,26 +111,26 @@ impl CheckpointBuilder {
let roots = self.effects_store.notify_read(roots).await?;
let unsorted = self.complete_checkpoint(roots)?;
let sorted = CasualOrder::casual_sort(unsorted);
self.write_checkpoint(height, sorted)?;
self.write_checkpoint(height, sorted).await?;
Ok(())
}

fn write_checkpoint(
async fn write_checkpoint(
&self,
height: CheckpointCommitHeight,
l: Vec<TransactionEffects>,
) -> SuiResult {
let mut batch = self.tables.pending_checkpoints.batch();
if !l.is_empty() {
// Only create checkpoint if content is not empty
batch = self.create_checkpoint(batch, l)?;
batch = self.create_checkpoint(batch, l).await?;
}
batch = batch.delete_batch(&self.tables.pending_checkpoints, [height])?;
batch.write()?;
Ok(())
}

fn create_checkpoint(
async fn create_checkpoint(
&self,
mut batch: DBBatch,
l: Vec<TransactionEffects>,
Expand All @@ -140,15 +145,15 @@ impl CheckpointBuilder {
);
let gas_cost_summary = GasCostSummary::new_from_txn_effects(l.iter());
let summary = CheckpointSummary::new(
0, //todo
self.epoch, // todo - need to figure out how this is updated
sequence_number,
&contents,
previous_digest,
gas_cost_summary,
None, //todo
);

self.output.checkpoint_created(&summary, &contents)?;
self.output.checkpoint_created(&summary, &contents).await?;

batch = batch.insert_batch(
&self.tables.checkpoint_content,
Expand Down Expand Up @@ -219,11 +224,11 @@ pub struct CheckpointService {
}

impl CheckpointService {
#[allow(dead_code)]
pub fn spawn(
path: &Path,
effects_store: Box<dyn EffectsNotifyRead>,
output: Box<dyn CheckpointOutput>,
epoch: EpochId,
) -> Arc<Self> {
let notify = Arc::new(Notify::new());

Expand All @@ -238,6 +243,7 @@ impl CheckpointService {
effects_store,
output,
exit_rcv,
epoch,
);

spawn_monitored_task!(builder.run());
Expand Down Expand Up @@ -272,6 +278,16 @@ impl CheckpointService {
self.notify.notify_one();
Ok(())
}

pub fn notify_checkpoint_signature(&self, info: Box<CheckpointSignatureMessage>) -> SuiResult {
info!(
"Received signature for checkpoint sequence {}, digest {} from {}",
info.summary.summary.sequence_number,
hex::encode(info.summary.summary.digest()),
info.summary.auth_signature.authority
);
Ok(()) // todo
}
}

#[cfg(test)]
Expand All @@ -293,7 +309,8 @@ mod tests {
let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
let store = Box::new(store);

let checkpoint_service = CheckpointService::spawn(tempdir.path(), store, Box::new(output));
let checkpoint_service =
CheckpointService::spawn(tempdir.path(), store, Box::new(output), 0);
checkpoint_service.notify_checkpoint(0, vec![d(4)]).unwrap();
// Verify that sending same digests at same height is noop
checkpoint_service.notify_checkpoint(0, vec![d(4)]).unwrap();
Expand Down Expand Up @@ -338,8 +355,9 @@ mod tests {
}
}

#[async_trait::async_trait]
impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
fn checkpoint_created(
async fn checkpoint_created(
&self,
summary: &CheckpointSummary,
contents: &CheckpointContents,
Expand Down
23 changes: 0 additions & 23 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,29 +251,6 @@ impl SubmitToConsensus for TransactionsClient<sui_network::tonic::transport::Cha
impl ConsensusAdapter {
/// Make a new Consensus adapter instance.
pub fn new(
consensus_address: Multiaddr,
committee: Committee,
tx_consensus_listener: Sender<ConsensusListenerMessage>,
timeout: Duration,
opt_metrics: OptArcConsensusAdapterMetrics,
) -> Self {
let consensus_client = Box::new(TransactionsClient::new(
mysten_network::client::connect_lazy(&consensus_address)
.expect("Failed to connect to consensus"),
));
let num_inflight_transactions = Default::default();
Self {
consensus_client,
committee,
_tx_consensus_listener: tx_consensus_listener,
timeout,
num_inflight_transactions,
opt_metrics,
}
}

#[cfg(test)]
pub fn new_test(
consensus_client: Box<dyn SubmitToConsensus>,
committee: Committee,
tx_consensus_listener: Sender<ConsensusListenerMessage>,
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/unit_tests/batch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub(crate) async fn init_state(
&checkpoint2_path,
Box::new(store.clone()),
LogCheckpointOutput::boxed(),
0,
);

AuthorityState::new(
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-core/src/unit_tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ async fn submit_transaction_to_consensus_adapter() {
let state = Arc::new(state);
let metrics = ConsensusAdapterMetrics::new_test();

#[derive(Clone)]
struct SubmitDirectly(Arc<AuthorityState>);

#[async_trait::async_trait]
impl SubmitToConsensus for SubmitDirectly {
async fn submit_to_consensus(&self, transaction: &ConsensusTransaction) -> SuiResult {
Expand All @@ -163,7 +165,7 @@ async fn submit_transaction_to_consensus_adapter() {
}
}
// Make a new consensus adapter instance.
let adapter = ConsensusAdapter::new_test(
let adapter = ConsensusAdapter::new(
Box::new(SubmitDirectly(state.clone())),
committee.clone(),
tx_consensus_listener,
Expand Down
1 change: 1 addition & 0 deletions crates/sui-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ sui-telemetry = { path = "../sui-telemetry" }
sui-types = { path = "../sui-types" }
sui-metrics = { path = "../sui-metrics" }
narwhal-network = { path = "../../narwhal/network" }
narwhal-types = { path = "../../narwhal/types" }
typed-store.workspace = true
mysten-network.workspace = true
telemetry-subscribers.workspace = true
Expand Down
Loading

0 comments on commit 0ae74f3

Please sign in to comment.