Skip to content

Commit

Permalink
[shared-object] Synchroniser for consensus client (MystenLabs#1025)
Browse files Browse the repository at this point in the history
consensus synchronizer
  • Loading branch information
asonnino authored Mar 23, 2022
1 parent 83469be commit 1234d8b
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 120 deletions.
6 changes: 6 additions & 0 deletions sui_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,12 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
.map(|x| x.unwrap_or_default())
.map_err(SuiError::from)
}

#[cfg(test)]
/// Provide read access to the `schedule` table (useful for testing).
pub fn get_schedule(&self, object_id: &ObjectID) -> SuiResult<Option<SequenceNumber>> {
self.schedule.get(object_id).map_err(SuiError::from)
}
}

impl<const ALL_OBJ_VER: bool> BackingPackageStore for SuiDataStore<ALL_OBJ_VER> {
Expand Down
60 changes: 42 additions & 18 deletions sui_core/src/consensus_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,32 @@

use crate::authority::AuthorityState;
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use std::cmp::Ordering;
use std::net::SocketAddr;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::Arc;
use sui_network::transport;
use sui_network::transport::{RwChannel, TcpDataStream};
use sui_types::base_types::SequenceNumber;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages::{ConfirmationTransaction, ConsensusOutput};
use sui_types::serialize::{deserialize_message, SerializedMessage};
use sui_types::messages::{ConfirmationTransaction, ConsensusOutput, ConsensusSync};
use sui_types::serialize::{deserialize_message, serialize_consensus_sync, SerializedMessage};
use sui_types::{fp_bail, fp_ensure};
use tokio::task::JoinHandle;

#[cfg(test)]
#[path = "unit_tests/consensus_tests.rs"]
pub mod consensus_tests;

/// The possible successful outcome when processing a consensus message.
enum ProcessingOutcome {
/// All went well (or at least there is nothing to do on our side).
Ok,
/// We missed some outputs and need to sync with the consensus node.
MissingOutputs,
}

/// The `ConsensusClient` receives certificates sequenced by the consensus and updates
/// the authority's database. The client assumes that the messages it receives have
/// already been authenticated (ie. they really come from a trusted consensus node) and
Expand Down Expand Up @@ -64,22 +74,28 @@ impl ConsensusClient {
buffer_size: usize,
) -> JoinHandle<SuiResult<()>> {
log::info!("Consensus client connecting to {}", address);
tokio::spawn(async move {
handler.synchronize().await?;
handler.run(address, buffer_size).await?;
Ok(())
})
tokio::spawn(async move { handler.run(address, buffer_size).await })
}

/// Synchronize with the consensus in case we missed part of its output sequence.
/// It is safety-critical that we process the consensus outputs in the right order.
async fn synchronize(&mut self) -> SuiResult<()> {
// TODO [issue #932]: [liveness-critical] Implement the synchronizer.
Ok(())
/// It is safety-critical that we process the consensus' outputs in the complete
/// and right order.
async fn synchronize(&mut self, connection: &mut TcpDataStream) -> SuiResult<()> {
let request = ConsensusSync {
sequencer_number: self.last_consensus_index,
};
let bytes = Bytes::from(serialize_consensus_sync(&request));
connection
.sink()
.send(bytes)
.await
.map_err(|e| SuiError::ClientIoError {
error: e.to_string(),
})
}

/// Process a single sequenced certificate.
async fn handle_consensus_message(&mut self, bytes: Bytes) -> SuiResult<()> {
async fn handle_consensus_message(&mut self, bytes: Bytes) -> SuiResult<ProcessingOutcome> {
// We first deserialize the consensus output message. If deserialization fails
// we may be have a liveness issue. We stop processing of this certificate to
// ensure safety, and the synchronizer will try again to ask for that certificate.
Expand All @@ -106,12 +122,11 @@ impl ConsensusClient {
Ordering::Greater => {
// Something is very wrong. Liveness may be lost (but not safety).
log::error!("Consensus index of authority bigger than expected");
return Ok(());
return Ok(ProcessingOutcome::Ok);
}
Ordering::Less => {
log::debug!("Authority is synchronizing missed sequenced certificates");
self.synchronize().await?;
return Ok(());
return Ok(ProcessingOutcome::MissingOutputs);
}
Ordering::Equal => (),
}
Expand Down Expand Up @@ -149,7 +164,8 @@ impl ConsensusClient {
let certificate = confirmation.certificate;
self.state
.handle_consensus_certificate(certificate, self.last_consensus_index)
.await
.await?;
Ok(ProcessingOutcome::Ok)
}

/// Main loop connecting to the consensus. This mainly acts as a light client.
Expand All @@ -169,7 +185,7 @@ impl ConsensusClient {

// Listen to sequenced certificates and process them.
loop {
let bytes = match connection.read_data().await {
let bytes = match connection.stream().next().await {
Some(Ok(data)) => Bytes::from(data),
Some(Err(e)) => {
log::warn!("Failed to receive data from consensus: {}", e);
Expand All @@ -196,7 +212,15 @@ impl ConsensusClient {
// Log the errors that are the client's fault (not ours). This is
// only for debug purposes: all correct authorities will do the same.
Err(e) => log::debug!("{}", e),
Ok(()) => (),
// The authority missed some consensus outputs and needs to sync.
Ok(ProcessingOutcome::MissingOutputs) => {
if let Err(e) = self.synchronize(&mut connection).await {
log::warn!("Failed to send sync request to consensus: {}", e);
continue 'main;
}
}
// Nothing to do.
Ok(ProcessingOutcome::Ok) => (),
}
}
}
Expand Down
Loading

0 comments on commit 1234d8b

Please sign in to comment.