Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint integration test #2343

Merged
merged 6 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ impl AuthorityState {
state
}

pub(crate) fn checkpoints(&self) -> Option<Arc<Mutex<CheckpointStore>>> {
pub fn checkpoints(&self) -> Option<Arc<Mutex<CheckpointStore>>> {
asonnino marked this conversation as resolved.
Show resolved Hide resolved
self.checkpoints.clone()
}

Expand Down
38 changes: 35 additions & 3 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

use crate::{
authority::AuthorityState,
consensus_adapter::{ConsensusAdapter, ConsensusListener, ConsensusListenerMessage},
consensus_adapter::{
CheckpointConsensusAdapter, CheckpointSender, ConsensusAdapter, ConsensusListener,
ConsensusListenerMessage,
},
};
use anyhow::anyhow;
use anyhow::Result;
Expand All @@ -19,7 +22,10 @@ use sui_network::{
};

use sui_types::{crypto::VerificationObligation, error::*, messages::*};
use tokio::sync::mpsc::{channel, Sender};
use tokio::{
sync::mpsc::{channel, Sender},
task::JoinHandle,
};

use sui_types::messages_checkpoint::CheckpointRequest;
use sui_types::messages_checkpoint::CheckpointResponse;
Expand Down Expand Up @@ -129,6 +135,7 @@ impl AuthorityServer {
.add_service(ValidatorServer::new(ValidatorService {
state: self.state,
consensus_adapter: self.consensus_adapter,
_checkpoint_consensus_handle: None,
}))
.bind(&address)
.await
Expand All @@ -147,6 +154,7 @@ impl AuthorityServer {
pub struct ValidatorService {
state: Arc<AuthorityState>,
consensus_adapter: ConsensusAdapter,
_checkpoint_consensus_handle: Option<JoinHandle<()>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this is not going to be so simple, since we will need to restart it upon reconfiguration? We will work out the right structure down the line.

}

impl ValidatorService {
Expand Down Expand Up @@ -189,17 +197,41 @@ impl ValidatorService {
/* max_pending_transactions */ 1_000_000,
);

// The consensus adapter allows the authority to send user certificates through consensus.
let consensus_adapter = ConsensusAdapter::new(
state.clone(),
consensus_config.address().to_owned(),
state.committee.clone(),
tx_sui_to_consensus,
tx_sui_to_consensus.clone(),
/* max_delay */ Duration::from_millis(5_000),
);

// Update the checkpoint store with a consensus client.
let checkpoint_consensus_handle = if let Some(checkpoint_store) = state.checkpoints() {
let (tx_checkpoint_consensus_adapter, rx_checkpoint_consensus_adapter) = channel(1_000);
let consensus_sender = CheckpointSender::new(tx_checkpoint_consensus_adapter);
checkpoint_store
.lock()
.set_consensus(Box::new(consensus_sender))?;

let handle = CheckpointConsensusAdapter::new(
/* consensus_address */ consensus_config.address().to_owned(),
/* tx_consensus_listener */ tx_sui_to_consensus,
rx_checkpoint_consensus_adapter,
/* checkpoint_locals */ checkpoint_store.lock().get_locals(),
/* retry_delay */ Duration::from_millis(5_000),
/* max_pending_transactions */ 10_000,
)
.spawn();
Some(handle)
} else {
None
};

Ok(Self {
state,
consensus_adapter,
_checkpoint_consensus_handle: checkpoint_consensus_handle,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ impl CheckpointStore {
// Helper write functions

/// Set the next checkpoint proposal.
fn set_proposal(&mut self) -> Result<CheckpointProposal, SuiError> {
pub fn set_proposal(&mut self) -> Result<CheckpointProposal, SuiError> {
asonnino marked this conversation as resolved.
Show resolved Hide resolved
// Check that:
// - there is no current proposal.
// - there are no unprocessed transactions.
Expand Down
20 changes: 13 additions & 7 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use crate::authority::AuthorityState;
use crate::checkpoints::CheckpointLocals;
use crate::checkpoints::ConsensusSender;
use arc_swap::ArcSwapOption;
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
Expand Down Expand Up @@ -278,6 +277,14 @@ pub struct CheckpointSender {
tx_checkpoint_consensus_adapter: Sender<CheckpointFragment>,
}

impl CheckpointSender {
pub fn new(tx_checkpoint_consensus_adapter: Sender<CheckpointFragment>) -> Self {
Self {
tx_checkpoint_consensus_adapter,
}
}
}

impl ConsensusSender for CheckpointSender {
fn send_to_consensus(&self, fragment: CheckpointFragment) -> SuiResult {
self.tx_checkpoint_consensus_adapter
Expand All @@ -295,7 +302,7 @@ pub struct CheckpointConsensusAdapter {
/// Receive new checkpoint fragments to sequence.
rx_checkpoint_consensus_adapter: Receiver<CheckpointFragment>,
/// A pointer to the checkpoints local store.
checkpoint_locals: ArcSwapOption<CheckpointLocals>,
checkpoint_locals: Arc<CheckpointLocals>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this! Indeed we do not need a swap here.

/// The initial delay to wait before re-attempting a connection with consensus (in ms).
retry_delay: Duration,
/// The maximum number of checkpoint fragment pending sequencing.
Expand All @@ -310,7 +317,7 @@ impl CheckpointConsensusAdapter {
consensus_address: Multiaddr,
tx_consensus_listener: Sender<ConsensusListenerMessage>,
rx_checkpoint_consensus_adapter: Receiver<CheckpointFragment>,
checkpoint_locals: ArcSwapOption<CheckpointLocals>,
checkpoint_locals: Arc<CheckpointLocals>,
retry_delay: Duration,
max_pending_transactions: usize,
) -> Self {
Expand All @@ -332,8 +339,8 @@ impl CheckpointConsensusAdapter {
}

/// Spawn a `CheckpointConsensusAdapter` in a dedicated tokio task.
pub fn spawn(mut instance: Self) -> JoinHandle<()> {
tokio::spawn(async move { instance.run().await })
pub fn spawn(mut self) -> JoinHandle<()> {
tokio::spawn(async move { self.run().await })
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have to track its lifetime and restart/halt propertly.

}

/// Submit a transaction to consensus.
Expand Down Expand Up @@ -403,8 +410,7 @@ impl CheckpointConsensusAdapter {
// Cleanup the buffer.
if self.buffer.len() >= self.max_pending_transactions {
// Drop the earliest fragments. They are not needed for liveness.
let locals = self.checkpoint_locals.load_full();
if let Some(proposal) = &locals.as_ref().unwrap().current_proposal {
if let Some(proposal) = &self.checkpoint_locals.current_proposal {
let current_sequence_number = proposal.sequence_number();
self.buffer.retain(|(_, s)| s >= current_sequence_number);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ workspace-hack = { path = "../workspace-hack"}
tracing-test = "0.2.1"
pretty_assertions = "1.2.0"
tokio-util = { version = "0.7.2", features = ["codec"] }

typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "7c247967e5a5abd59ecaa75bc62b05bcdf4503fe"}
test-utils = { path = "../test-utils" }

[features]
Expand Down
82 changes: 82 additions & 0 deletions crates/sui/tests/checkpoints_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use sui_types::{base_types::TransactionDigest, messages_checkpoint::CheckpointRequest};
use test_utils::authority::{spawn_test_authorities, test_authority_configs};
use tokio::time::sleep;
use tokio::time::Duration;
use typed_store::traits::Map;

#[tokio::test]
async fn sequence_fragments() {
// Spawn a quorum of authorities.
let configs = test_authority_configs();
let mut handles = spawn_test_authorities(vec![], &configs).await;

// Get checkpoint proposals.
let t1 = TransactionDigest::random();
let t2 = TransactionDigest::random();
let t3 = TransactionDigest::random();
let transactions = [(1, t1), (2, t2), (3, t3)];
let next_sequence_number = (transactions.len() + 1) as u64;

let mut proposals: Vec<_> = handles
.iter_mut()
.map(|handle| {
let checkpoints_store = handle.state().checkpoints().unwrap();
checkpoints_store
.lock()
.handle_internal_batch(next_sequence_number, &transactions)
.unwrap();
let proposal = checkpoints_store.lock().set_proposal().unwrap();
proposal
})
.collect();

// Ensure the are no fragments in the checkpoint store at this time.
for handle in &handles {
let status = handle
.state()
.checkpoints()
.unwrap()
.lock()
.fragments
.iter()
.skip_to_last()
.next();
assert!(status.is_none());
}

// Make a checkpoint fragment and sequence it.
let p1 = proposals.pop().unwrap();
let p2 = proposals.pop().unwrap();
let fragment = p1.fragment_with(&p2);

let request = CheckpointRequest::set_fragment(fragment);
for handle in handles.iter_mut() {
let _response = handle
.state()
.checkpoints()
.unwrap()
.lock()
.handle_checkpoint_request(&request);
}

// Wait until all validators sequence and process the fragment.
loop {
let ok = handles.iter().all(|handle| {
handle
.state()
.checkpoints()
.unwrap()
.lock()
.fragments
.iter()
.next()
.is_some()
});
if ok {
break;
}
sleep(Duration::from_millis(10)).await;
}
}