diff --git a/Cargo.lock b/Cargo.lock index d5d339a3e0a8b..74f606c71ad04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8664,6 +8664,7 @@ dependencies = [ "multiaddr", "mysten-network", "narwhal-network", + "narwhal-types", "parking_lot 0.12.1", "prometheus", "sui-config", diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 6ce42d0524d1d..54013317b760c 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -1665,6 +1665,7 @@ impl AuthorityState { &path.join("checkpoint2"), Box::new(store.clone()), LogCheckpointOutput::boxed(), + 0, ); // add the object_basics module @@ -2321,6 +2322,14 @@ impl AuthorityState { ); })?; } + ConsensusTransactionKind::CheckpointSignature(data) => { + data.verify(&self.committee.load()).map_err(|err|{ + warn!( + "Ignoring malformed checkpoint signature (failed to verify) from {}, sequence {}: {:?}", + transaction.consensus_output.certificate.header.author, data.summary.summary.sequence_number, err + ); + })?; + } } Ok(VerifiedSequencedConsensusTransaction(transaction)) } @@ -2426,6 +2435,9 @@ impl AuthorityState { Ok(()) } + ConsensusTransactionKind::CheckpointSignature(info) => { + self.checkpoint_service.notify_checkpoint_signature(info) + } } } diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index 1ecc25dc48f19..1ee5cce868fe4 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -39,6 +39,7 @@ use sui_metrics::spawn_monitored_task; use sui_types::messages_checkpoint::CheckpointRequest; use sui_types::messages_checkpoint::CheckpointResponse; +use crate::consensus_adapter::SubmitToConsensus; use crate::consensus_handler::ConsensusHandler; use tracing::{debug, info, Instrument}; @@ -99,8 +100,13 @@ impl AuthorityServer { consensus_address: Multiaddr, tx_consensus_listener: Sender, ) -> Self { + use narwhal_types::TransactionsClient; + let consensus_client = Box::new(TransactionsClient::new( + mysten_network::client::connect_lazy(&consensus_address) + .expect("Failed to connect to consensus"), + )); let consensus_adapter = ConsensusAdapter::new( - consensus_address, + consensus_client, state.clone_committee(), tx_consensus_listener, Duration::from_secs(20), @@ -263,6 +269,7 @@ impl ValidatorService { /// Spawn all the subsystems run by a Sui authority: a consensus node, a sui authority server, /// and a consensus listener bridging the consensus node and the sui authority. pub async fn new( + consensus_client: Box, config: &NodeConfig, state: Arc, prometheus_registry: Registry, @@ -308,7 +315,7 @@ impl ValidatorService { // The consensus adapter allows the authority to send user certificates through consensus. let consensus_adapter = ConsensusAdapter::new( - consensus_config.address().to_owned(), + consensus_client, state.clone_committee(), tx_consensus_listener.clone(), timeout, diff --git a/crates/sui-core/src/checkpoints2/checkpoint_output.rs b/crates/sui-core/src/checkpoints2/checkpoint_output.rs index d8e350c1b14a8..72d9d892023d7 100644 --- a/crates/sui-core/src/checkpoints2/checkpoint_output.rs +++ b/crates/sui-core/src/checkpoints2/checkpoint_output.rs @@ -1,18 +1,32 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::authority::StableSyncAuthoritySigner; +use crate::consensus_adapter::SubmitToConsensus; +use async_trait::async_trait; +use sui_types::base_types::AuthorityName; use sui_types::error::SuiResult; -use sui_types::messages_checkpoint::{CheckpointContents, CheckpointSummary}; +use sui_types::messages::ConsensusTransaction; +use sui_types::messages_checkpoint::{ + CheckpointContents, CheckpointSummary, SignedCheckpointData, SignedCheckpointSummary, +}; use tracing::{debug, info}; +#[async_trait] pub trait CheckpointOutput: Sync + Send + 'static { - fn checkpoint_created( + async fn checkpoint_created( &self, summary: &CheckpointSummary, contents: &CheckpointContents, ) -> SuiResult; } +pub struct SubmitCheckpointToConsensus { + pub sender: T, + pub signer: StableSyncAuthoritySigner, + pub authority: AuthorityName, +} + pub struct LogCheckpointOutput; impl LogCheckpointOutput { @@ -21,8 +35,30 @@ impl LogCheckpointOutput { } } +#[async_trait] +impl CheckpointOutput for SubmitCheckpointToConsensus { + async fn checkpoint_created( + &self, + summary: &CheckpointSummary, + contents: &CheckpointContents, + ) -> SuiResult { + LogCheckpointOutput + .checkpoint_created(summary, contents) + .await?; + let summary = SignedCheckpointSummary::new_from_summary( + summary.clone(), + self.authority, + &*self.signer, + ); + let message = SignedCheckpointData { summary }; + let transaction = ConsensusTransaction::new_checkpoint_signature_message(message); + self.sender.submit_to_consensus(&transaction).await + } +} + +#[async_trait] impl CheckpointOutput for LogCheckpointOutput { - fn checkpoint_created( + async fn checkpoint_created( &self, summary: &CheckpointSummary, contents: &CheckpointContents, diff --git a/crates/sui-core/src/checkpoints2/mod.rs b/crates/sui-core/src/checkpoints2/mod.rs index 383eeacf0d337..ccbf29a96fbc2 100644 --- a/crates/sui-core/src/checkpoints2/mod.rs +++ b/crates/sui-core/src/checkpoints2/mod.rs @@ -7,7 +7,9 @@ mod checkpoint_output; use crate::authority::EffectsNotifyRead; use crate::checkpoints2::casual_order::CasualOrder; use crate::checkpoints2::checkpoint_output::CheckpointOutput; -pub use crate::checkpoints2::checkpoint_output::LogCheckpointOutput; +pub use crate::checkpoints2::checkpoint_output::{ + LogCheckpointOutput, SubmitCheckpointToConsensus, +}; use futures::future::{select, Either}; use futures::FutureExt; use std::collections::HashSet; @@ -15,15 +17,15 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; use sui_metrics::spawn_monitored_task; -use sui_types::base_types::TransactionDigest; +use sui_types::base_types::{EpochId, TransactionDigest}; use sui_types::error::SuiResult; use sui_types::gas::GasCostSummary; use sui_types::messages::TransactionEffects; use sui_types::messages_checkpoint::{ - CheckpointContents, CheckpointSequenceNumber, CheckpointSummary, + CheckpointContents, CheckpointSequenceNumber, CheckpointSummary, SignedCheckpointData, }; use tokio::sync::{oneshot, Notify}; -use tracing::{debug, error}; +use tracing::{debug, error, info}; use typed_store::rocks::{DBBatch, DBMap}; use typed_store::traits::TypedStoreDebug; use typed_store::Map; @@ -60,6 +62,7 @@ pub struct CheckpointBuilder { effects_store: Box, output: Box, exit: oneshot::Receiver<()>, + epoch: EpochId, } impl CheckpointBuilder { @@ -69,6 +72,7 @@ impl CheckpointBuilder { effects_store: Box, output: Box, exit: oneshot::Receiver<()>, + epoch: EpochId, ) -> Self { Self { tables, @@ -76,6 +80,7 @@ impl CheckpointBuilder { effects_store, output, exit, + epoch, } } @@ -106,11 +111,11 @@ impl CheckpointBuilder { let roots = self.effects_store.notify_read(roots).await?; let unsorted = self.complete_checkpoint(roots)?; let sorted = CasualOrder::casual_sort(unsorted); - self.write_checkpoint(height, sorted)?; + self.write_checkpoint(height, sorted).await?; Ok(()) } - fn write_checkpoint( + async fn write_checkpoint( &self, height: CheckpointCommitHeight, l: Vec, @@ -118,14 +123,14 @@ impl CheckpointBuilder { let mut batch = self.tables.pending_checkpoints.batch(); if !l.is_empty() { // Only create checkpoint if content is not empty - batch = self.create_checkpoint(batch, l)?; + batch = self.create_checkpoint(batch, l).await?; } batch = batch.delete_batch(&self.tables.pending_checkpoints, [height])?; batch.write()?; Ok(()) } - fn create_checkpoint( + async fn create_checkpoint( &self, mut batch: DBBatch, l: Vec, @@ -140,7 +145,7 @@ impl CheckpointBuilder { ); let gas_cost_summary = GasCostSummary::new_from_txn_effects(l.iter()); let summary = CheckpointSummary::new( - 0, //todo + self.epoch, // todo - need to figure out how this is updated sequence_number, &contents, previous_digest, @@ -148,7 +153,7 @@ impl CheckpointBuilder { None, //todo ); - self.output.checkpoint_created(&summary, &contents)?; + self.output.checkpoint_created(&summary, &contents).await?; batch = batch.insert_batch( &self.tables.checkpoint_content, @@ -219,11 +224,11 @@ pub struct CheckpointService { } impl CheckpointService { - #[allow(dead_code)] pub fn spawn( path: &Path, effects_store: Box, output: Box, + epoch: EpochId, ) -> Arc { let notify = Arc::new(Notify::new()); @@ -238,6 +243,7 @@ impl CheckpointService { effects_store, output, exit_rcv, + epoch, ); spawn_monitored_task!(builder.run()); @@ -272,6 +278,16 @@ impl CheckpointService { self.notify.notify_one(); Ok(()) } + + pub fn notify_checkpoint_signature(&self, info: Box) -> SuiResult { + info!( + "Received signature for checkpoint sequence {}, digest {} from {}", + info.summary.summary.sequence_number, + hex::encode(info.summary.summary.digest()), + info.summary.auth_signature.authority + ); + Ok(()) // todo + } } #[cfg(test)] @@ -293,7 +309,8 @@ mod tests { let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10); let store = Box::new(store); - let checkpoint_service = CheckpointService::spawn(tempdir.path(), store, Box::new(output)); + let checkpoint_service = + CheckpointService::spawn(tempdir.path(), store, Box::new(output), 0); checkpoint_service.notify_checkpoint(0, vec![d(4)]).unwrap(); // Verify that sending same digests at same height is noop checkpoint_service.notify_checkpoint(0, vec![d(4)]).unwrap(); @@ -338,8 +355,9 @@ mod tests { } } + #[async_trait::async_trait] impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> { - fn checkpoint_created( + async fn checkpoint_created( &self, summary: &CheckpointSummary, contents: &CheckpointContents, diff --git a/crates/sui-core/src/consensus_adapter.rs b/crates/sui-core/src/consensus_adapter.rs index 5e570f58cb344..542013c675570 100644 --- a/crates/sui-core/src/consensus_adapter.rs +++ b/crates/sui-core/src/consensus_adapter.rs @@ -251,29 +251,6 @@ impl SubmitToConsensus for TransactionsClient, - timeout: Duration, - opt_metrics: OptArcConsensusAdapterMetrics, - ) -> Self { - let consensus_client = Box::new(TransactionsClient::new( - mysten_network::client::connect_lazy(&consensus_address) - .expect("Failed to connect to consensus"), - )); - let num_inflight_transactions = Default::default(); - Self { - consensus_client, - committee, - _tx_consensus_listener: tx_consensus_listener, - timeout, - num_inflight_transactions, - opt_metrics, - } - } - - #[cfg(test)] - pub fn new_test( consensus_client: Box, committee: Committee, tx_consensus_listener: Sender, diff --git a/crates/sui-core/src/unit_tests/batch_tests.rs b/crates/sui-core/src/unit_tests/batch_tests.rs index baf35b497c751..58708fdb439e1 100644 --- a/crates/sui-core/src/unit_tests/batch_tests.rs +++ b/crates/sui-core/src/unit_tests/batch_tests.rs @@ -93,6 +93,7 @@ pub(crate) async fn init_state( &checkpoint2_path, Box::new(store.clone()), LogCheckpointOutput::boxed(), + 0, ); AuthorityState::new( diff --git a/crates/sui-core/src/unit_tests/consensus_tests.rs b/crates/sui-core/src/unit_tests/consensus_tests.rs index 7c11890d27db1..fd88287f97735 100644 --- a/crates/sui-core/src/unit_tests/consensus_tests.rs +++ b/crates/sui-core/src/unit_tests/consensus_tests.rs @@ -151,7 +151,9 @@ async fn submit_transaction_to_consensus_adapter() { let state = Arc::new(state); let metrics = ConsensusAdapterMetrics::new_test(); + #[derive(Clone)] struct SubmitDirectly(Arc); + #[async_trait::async_trait] impl SubmitToConsensus for SubmitDirectly { async fn submit_to_consensus(&self, transaction: &ConsensusTransaction) -> SuiResult { @@ -163,7 +165,7 @@ async fn submit_transaction_to_consensus_adapter() { } } // Make a new consensus adapter instance. - let adapter = ConsensusAdapter::new_test( + let adapter = ConsensusAdapter::new( Box::new(SubmitDirectly(state.clone())), committee.clone(), tx_consensus_listener, diff --git a/crates/sui-node/Cargo.toml b/crates/sui-node/Cargo.toml index cb89fe3cea1b8..1df855ae96b14 100644 --- a/crates/sui-node/Cargo.toml +++ b/crates/sui-node/Cargo.toml @@ -30,6 +30,7 @@ sui-telemetry = { path = "../sui-telemetry" } sui-types = { path = "../sui-types" } sui-metrics = { path = "../sui-metrics" } narwhal-network = { path = "../../narwhal/network" } +narwhal-types = { path = "../../narwhal/types" } typed-store.workspace = true mysten-network.workspace = true telemetry-subscribers.workspace = true diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 7b9877bdf7819..4f3c89a0f737a 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -67,7 +67,8 @@ pub mod metrics; mod handle; pub use handle::SuiNodeHandle; -use sui_core::checkpoints2::{CheckpointService, LogCheckpointOutput}; +use narwhal_types::TransactionsClient; +use sui_core::checkpoints2::{CheckpointService, LogCheckpointOutput, SubmitCheckpointToConsensus}; pub struct SuiNode { grpc_server: tokio::task::JoinHandle>, @@ -154,10 +155,31 @@ impl SuiNode { None, None, )); + + let consensus_client = config.consensus_config().map(|config| { + let consensus_address = config.address().to_owned(); + TransactionsClient::new( + mysten_network::client::connect_lazy(&consensus_address) + .expect("Failed to connect to consensus"), + ) + }); + + let checkpoint_output = if let Some(ref consensus_client) = consensus_client { + Box::new(SubmitCheckpointToConsensus { + sender: consensus_client.clone(), + signer: secret.clone(), + authority: config.protocol_public_key(), + }) + } else { + // todo - we should refactor code a bit and simply don't start checkpoint builder on full node + LogCheckpointOutput::boxed() + }; + let checkpoint_service = CheckpointService::spawn( &config.db_path().join("checkpoints2"), Box::new(store.clone()), - LogCheckpointOutput::boxed(), + checkpoint_output, + committee.epoch, ); let state = Arc::new( @@ -281,10 +303,16 @@ impl SuiNode { }; let registry = prometheus_registry.clone(); - let validator_service = if config.consensus_config().is_some() { + let validator_service = if let Some(consensus_client) = consensus_client { Some( - ValidatorService::new(config, state.clone(), registry, rx_reconfigure_consensus) - .await?, + ValidatorService::new( + Box::new(consensus_client), + config, + state.clone(), + registry, + rx_reconfigure_consensus, + ) + .await?, ) } else { None diff --git a/crates/sui-types/src/gas.rs b/crates/sui-types/src/gas.rs index 7d56a59798b03..fe6b781d48ea6 100644 --- a/crates/sui-types/src/gas.rs +++ b/crates/sui-types/src/gas.rs @@ -49,7 +49,7 @@ macro_rules! ok_or_gas_error { }; } -#[derive(Eq, PartialEq, Clone, Debug, Default, Serialize, Deserialize, JsonSchema)] +#[derive(Eq, PartialEq, Clone, Debug, Default, Serialize, Deserialize, JsonSchema, Hash)] pub struct GasCostSummary { pub computation_cost: u64, pub storage_cost: u64, diff --git a/crates/sui-types/src/messages.rs b/crates/sui-types/src/messages.rs index d1436c1d992c8..24dc873e76548 100644 --- a/crates/sui-types/src/messages.rs +++ b/crates/sui-types/src/messages.rs @@ -11,7 +11,8 @@ use crate::crypto::{ use crate::gas::GasCostSummary; use crate::message_envelope::{Envelope, Message, TrustedEnvelope, VerifiedEnvelope}; use crate::messages_checkpoint::{ - AuthenticatedCheckpoint, CheckpointSequenceNumber, SignedCheckpointFragmentMessage, + AuthenticatedCheckpoint, CheckpointSequenceNumber, SignedCheckpointData, + SignedCheckpointFragmentMessage, }; use crate::object::{Object, ObjectFormatOptions, Owner, OBJECT_START_VERSION}; use crate::storage::{DeleteKind, WriteKind}; @@ -1998,6 +1999,7 @@ pub struct ConsensusTransaction { pub enum ConsensusTransactionKind { UserTransaction(Box), Checkpoint(Box), + CheckpointSignature(Box), } impl ConsensusTransaction { @@ -2026,6 +2028,16 @@ impl ConsensusTransaction { } } + pub fn new_checkpoint_signature_message(data: SignedCheckpointData) -> Self { + let mut hasher = DefaultHasher::new(); + data.summary.auth_signature.signature.hash(&mut hasher); + let tracking_id = hasher.finish().to_be_bytes(); + Self { + tracking_id, + kind: ConsensusTransactionKind::CheckpointSignature(Box::new(data)), + } + } + pub fn get_tracking_id(&self) -> u64 { (&self.tracking_id[..]) .read_u64::() @@ -2038,6 +2050,7 @@ impl ConsensusTransaction { certificate.verify_signature(committee) } ConsensusTransactionKind::Checkpoint(fragment) => fragment.verify(), + ConsensusTransactionKind::CheckpointSignature(data) => data.verify(committee), } } } diff --git a/crates/sui-types/src/messages_checkpoint.rs b/crates/sui-types/src/messages_checkpoint.rs index 101815f3f33eb..545b6ee5410bd 100644 --- a/crates/sui-types/src/messages_checkpoint.rs +++ b/crates/sui-types/src/messages_checkpoint.rs @@ -182,7 +182,7 @@ pub type CheckpointContentsDigest = [u8; 32]; // The constituent parts of checkpoints, signed and certified -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct CheckpointSummary { pub epoch: EpochId, pub sequence_number: CheckpointSequenceNumber, @@ -249,7 +249,7 @@ impl Display for CheckpointSummary { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Hash)] pub struct CheckpointSummaryEnvelope { pub summary: CheckpointSummary, pub auth_signature: S, @@ -436,6 +436,12 @@ impl CheckpointProposalContents { } } +/// This is a message validators publish to consensus in order to sign checkpoint +#[derive(Clone, Debug, Serialize, Deserialize, Hash)] +pub struct SignedCheckpointData { + pub summary: SignedCheckpointSummary, +} + /// CheckpointContents are the transactions included in an upcoming checkpoint. /// They must have already been causally ordered. Since the causal order algorithm /// is the same among validators, we expect all honest validators to come up with @@ -445,6 +451,12 @@ pub struct CheckpointContents { transactions: Vec, } +impl SignedCheckpointData { + pub fn verify(&self, committee: &Committee) -> SuiResult { + self.summary.verify(committee, None) + } +} + impl CheckpointContents { pub fn new_with_causally_ordered_transactions(contents: T) -> Self where