Skip to content

Commit

Permalink
[checkpoints] Active checkpoint logic (MystenLabs#2091)
Browse files Browse the repository at this point in the history
* Files, infra and basic test for active checkpoints
* Added active logic
* Added the sync active logic
* Add auto-execute on checkpoints
* Added poor man's checkpoint sync
* Add observability
* Review comments
* Temp fix to issue MystenLabs#2360

Co-authored-by: George Danezis <george@danez.is>
  • Loading branch information
gdanezis and George Danezis authored Jun 1, 2022
1 parent dcc9a0a commit 049eef2
Show file tree
Hide file tree
Showing 21 changed files with 1,526 additions and 183 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ sui-storage = { path = "../sui-storage" }
sui-config = { path = "../sui-config" }
sui-json = { path = "../sui-json" }

telemetry-subscribers = { git = "https://github.com/MystenLabs/mysten-infra", rev = "2de1a391654a7ee09d867de2f16627b915ad21f0", features = ["jaeger", "opentelemetry", "opentelemetry-jaeger", "tracing-opentelemetry"] }

move-binary-format = { git = "https://github.com/move-language/move", rev = "1b2d3b4274345f5b4b6a1a1bde5aee452003ab5b" }
move-bytecode-utils = { git = "https://github.com/move-language/move", rev = "1b2d3b4274345f5b4b6a1a1bde5aee452003ab5b" }
move-core-types = { git = "https://github.com/move-language/move", rev = "1b2d3b4274345f5b4b6a1a1bde5aee452003ab5b", features = ["address20"] }
Expand Down
9 changes: 4 additions & 5 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,10 @@ impl<
.ok_or(SuiError::TransactionLockDoesNotExist)?;

match transaction_option {
Some(tx_digest) => Ok(Some(
self.transactions
.get(&tx_digest)?
.expect("Stored a lock without storing transaction?"),
)),
Some(tx_digest) => {
return Ok(self.transactions.get(&tx_digest)?);
// .expect("Stored a lock without storing transaction?"),
}
None => Ok(None),
}
}
Expand Down
38 changes: 33 additions & 5 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{
};
use sui_types::{base_types::AuthorityName, error::SuiResult};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::error;

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
Expand All @@ -47,6 +47,11 @@ use tokio::time::Instant;
pub mod gossip;
use gossip::gossip_process;

pub mod checkpoint_driver;
use checkpoint_driver::checkpoint_process;

use self::checkpoint_driver::CheckpointProcessControl;

// TODO: Make these into a proper config
const MAX_RETRIES_RECORDED: u32 = 10;
const DELAY_FOR_1_RETRY_MS: u64 = 2_000;
Expand Down Expand Up @@ -91,6 +96,7 @@ impl AuthorityHealth {
}
}

#[derive(Clone)]
pub struct ActiveAuthority<A> {
// The local authority state
pub state: Arc<AuthorityState>,
Expand Down Expand Up @@ -170,11 +176,33 @@ impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub async fn spawn_all_active_processes(self) {
self.spawn_active_processes(true, true).await
}

/// Spawn all active tasks.
pub async fn spawn_all_active_processes(self) -> JoinHandle<()> {
pub async fn spawn_active_processes(self, gossip: bool, checkpoint: bool) {
// Spawn a task to take care of gossip
tokio::task::spawn(async move {
gossip_process(&self, 4).await;
})
let gossip_locals = self.clone();
let _gossip_join = tokio::task::spawn(async move {
if gossip {
gossip_process(&gossip_locals, 4).await;
}
});

// Spawn task to take care of checkpointing
let checkpoint_locals = self; // .clone();
let _checkpoint_join = tokio::task::spawn(async move {
if checkpoint {
checkpoint_process(&checkpoint_locals, &CheckpointProcessControl::default()).await;
}
});

if let Err(err) = _gossip_join.await {
error!("Join gossip task end error: {:?}", err);
}
if let Err(err) = _checkpoint_join.await {
error!("Join checkpoint task end error: {:?}", err);
}
}
}
Loading

0 comments on commit 049eef2

Please sign in to comment.