Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[formal snapshots] Support without DB Checkpoint upload #14645

Merged
merged 1 commit into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ pub struct AuthorityState {
_pruner: AuthorityStorePruner,
_authority_per_epoch_pruner: AuthorityPerEpochStorePruner,

/// Take db checkpoints af different dbs
/// Take db checkpoints of different dbs
db_checkpoint_config: DBCheckpointConfig,

/// Config controlling what kind of expensive safety checks to perform.
Expand Down
170 changes: 118 additions & 52 deletions crates/sui-core/src/db_checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct DBCheckpointHandler {
/// DB checkpoint directory on local filesystem
input_root_path: PathBuf,
/// Bucket on cloud object store where db checkpoints will be copied
output_object_store: Arc<DynObjectStore>,
output_object_store: Option<Arc<DynObjectStore>>,
/// Time interval to check for presence of new db checkpoint
interval: Duration,
/// File markers which signal that local db checkpoint can be garbage collected
Expand All @@ -73,7 +73,7 @@ pub struct DBCheckpointHandler {
impl DBCheckpointHandler {
pub fn new(
input_path: &std::path::Path,
output_object_store_config: &ObjectStoreConfig,
output_object_store_config: Option<&ObjectStoreConfig>,
interval_s: u64,
prune_and_compact_before_upload: bool,
indirect_objects_threshold: usize,
Expand All @@ -93,7 +93,8 @@ impl DBCheckpointHandler {
Ok(Arc::new(DBCheckpointHandler {
input_object_store: input_store_config.make()?,
input_root_path: input_path.to_path_buf(),
output_object_store: output_object_store_config.make()?,
output_object_store: output_object_store_config
.map(|config| config.make().expect("Failed to make object store")),
interval: Duration::from_secs(interval_s),
gc_markers,
prune_and_compact_before_upload,
Expand All @@ -104,7 +105,7 @@ impl DBCheckpointHandler {
}
pub fn new_for_test(
input_object_store_config: &ObjectStoreConfig,
output_object_store_config: &ObjectStoreConfig,
output_object_store_config: Option<&ObjectStoreConfig>,
interval_s: u64,
prune_and_compact_before_upload: bool,
) -> Result<Arc<Self>> {
Expand All @@ -115,7 +116,8 @@ impl DBCheckpointHandler {
.as_ref()
.unwrap()
.clone(),
output_object_store: output_object_store_config.make()?,
output_object_store: output_object_store_config
.map(|config| config.make().expect("Failed to make object store")),
interval: Duration::from_secs(interval_s),
gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
prune_and_compact_before_upload,
Expand All @@ -126,10 +128,20 @@ impl DBCheckpointHandler {
}
pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
self.clone(),
kill_sender.subscribe(),
));
if self.output_object_store.is_some() {
tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
self.clone(),
kill_sender.subscribe(),
));
} else {
// if db checkpoint remote store is not specified, cleanup loop
// is run to immediately mark db checkpoint upload as succesful
// so that they can be snapshotted and garbage collected
tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
self.clone(),
kill_sender.subscribe(),
));
}
tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
self,
kill_sender.subscribe(),
Expand All @@ -145,7 +157,7 @@ impl DBCheckpointHandler {
loop {
tokio::select! {
_now = interval.tick() => {
if let Ok(epochs) = find_missing_epochs_dirs(&self.output_object_store, SUCCESS_MARKER).await {
if let Ok(epochs) = find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
Expand All @@ -160,6 +172,41 @@ impl DBCheckpointHandler {
}
Ok(())
}
async fn run_db_checkpoint_cleanup_loop(
self: Arc<Self>,
mut recv: tokio::sync::broadcast::Receiver<()>,
) -> Result<()> {
let mut interval = tokio::time::interval(self.interval);
info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
loop {
tokio::select! {
_now = interval.tick() => {
let local_checkpoints_by_epoch =
find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
for (_, db_path) in dirs {
// If db checkpoint marked as completed, skip
let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
if upload_completed_path.exists() {
continue;
}
let bytes = Bytes::from_static(b"success");
let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
put(
&upload_completed_marker,
bytes.clone(),
self.input_object_store.clone(),
)
.await?;
}
},
_ = recv.recv() => break,
}
}
Ok(())
}
async fn run_db_checkpoint_gc_loop(
self: Arc<Self>,
mut recv: tokio::sync::broadcast::Receiver<()>,
Expand Down Expand Up @@ -210,12 +257,20 @@ impl DBCheckpointHandler {
AuthorityStorePruner::compact(&perpetual_db)?;
Ok(())
}
async fn upload_db_checkpoints_to_object_store(&self, missing_epochs: Vec<u64>) -> Result<()> {
async fn upload_db_checkpoints_to_object_store(
&self,
missing_epochs: Vec<u64>,
) -> Result<(), anyhow::Error> {
let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
let local_checkpoints_by_epoch =
find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
let object_store = self
.output_object_store
.as_ref()
.expect("Expected object store to exist")
.clone();
for (epoch, db_path) in dirs {
if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
if self.prune_and_compact_before_upload {
Expand All @@ -228,19 +283,14 @@ impl DBCheckpointHandler {
copy_recursively(
db_path,
self.input_object_store.clone(),
self.output_object_store.clone(),
object_store.clone(),
NonZeroUsize::new(20).unwrap(),
)
.await?;
// Drop marker in the output directory that upload completed successfully
let bytes = Bytes::from_static(b"success");
let success_marker = db_path.child(SUCCESS_MARKER);
put(
&success_marker,
bytes.clone(),
self.output_object_store.clone(),
)
.await?;
put(&success_marker, bytes.clone(), object_store.clone()).await?;
}
let bytes = Bytes::from_static(b"success");
let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
Expand Down Expand Up @@ -331,7 +381,7 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;
Expand All @@ -348,9 +398,11 @@ mod tests {
.unwrap(),
std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
);
let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;
Expand Down Expand Up @@ -400,7 +452,7 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;
Expand All @@ -415,9 +467,11 @@ mod tests {
let file3 = nested_dir.join("file3");
fs::write(file3, b"Lorem ipsum")?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;
Expand Down Expand Up @@ -447,9 +501,11 @@ mod tests {

// Checkpoint handler should copy checkpoint for epoch_0 first before copying
// epoch_1
let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;
Expand Down Expand Up @@ -517,35 +573,41 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;

let first_missing_epoch =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?
.first()
.cloned()
.unwrap();
let first_missing_epoch = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?
.first()
.cloned()
.unwrap();
assert_eq!(first_missing_epoch, 2);

let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;

let first_missing_epoch =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?
.first()
.cloned()
.unwrap();
let first_missing_epoch = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?
.first()
.cloned()
.unwrap();
assert_eq!(first_missing_epoch, 0);

Ok(())
Expand Down Expand Up @@ -575,22 +637,26 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
assert_eq!(missing_epochs, vec![0]);
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
expected_missing_epochs.extend((101..200).collect_vec().iter());
expected_missing_epochs.push(201);
Expand Down
37 changes: 27 additions & 10 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,24 +828,42 @@ impl SuiNode {
DBCheckpointConfig,
Option<tokio::sync::broadcast::Sender<()>>,
)> {
let checkpoint_path = Some(
config
.db_checkpoint_config
.checkpoint_path
.clone()
.unwrap_or_else(|| config.db_checkpoint_path()),
);
let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
DBCheckpointConfig {
checkpoint_path: Some(config.db_checkpoint_path()),
checkpoint_path,
perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
true
} else {
config
.db_checkpoint_config
.perform_db_checkpoints_at_epoch_end
},
..config.db_checkpoint_config.clone()
}
} else {
config.db_checkpoint_config.clone()
};

match db_checkpoint_config
.checkpoint_path
.as_ref()
.zip(db_checkpoint_config.object_store_config.as_ref())
{
Some((path, object_store_config)) => {
match (
db_checkpoint_config.object_store_config.as_ref(),
state_snapshot_enabled,
) {
// If db checkpoint config object store not specified but
// state snapshot object store is specified, create handler
// anyway for marking db checkpoints as completed so that they
// can be uploaded as state snapshots.
(None, false) => Ok((db_checkpoint_config, None)),
(_, _) => {
let handler = DBCheckpointHandler::new(
path,
object_store_config,
&db_checkpoint_config.checkpoint_path.clone().unwrap(),
db_checkpoint_config.object_store_config.as_ref(),
60,
db_checkpoint_config
.prune_and_compact_before_upload
Expand All @@ -860,7 +878,6 @@ impl SuiNode {
Some(DBCheckpointHandler::start(handler)),
))
}
None => Ok((db_checkpoint_config, None)),
}
}

Expand Down
Loading