diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index fc145136305492..978ba04543fd24 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -639,7 +639,7 @@ pub struct AuthorityState { _pruner: AuthorityStorePruner, _authority_per_epoch_pruner: AuthorityPerEpochStorePruner, - /// Take db checkpoints af different dbs + /// Take db checkpoints of different dbs db_checkpoint_config: DBCheckpointConfig, /// Config controlling what kind of expensive safety checks to perform. diff --git a/crates/sui-core/src/db_checkpoint_handler.rs b/crates/sui-core/src/db_checkpoint_handler.rs index 94a94c248d5f5a..ad7cf3c3df19df 100644 --- a/crates/sui-core/src/db_checkpoint_handler.rs +++ b/crates/sui-core/src/db_checkpoint_handler.rs @@ -56,7 +56,7 @@ pub struct DBCheckpointHandler { /// DB checkpoint directory on local filesystem input_root_path: PathBuf, /// Bucket on cloud object store where db checkpoints will be copied - output_object_store: Arc, + output_object_store: Option>, /// Time interval to check for presence of new db checkpoint interval: Duration, /// File markers which signal that local db checkpoint can be garbage collected @@ -73,7 +73,7 @@ pub struct DBCheckpointHandler { impl DBCheckpointHandler { pub fn new( input_path: &std::path::Path, - output_object_store_config: &ObjectStoreConfig, + output_object_store_config: Option<&ObjectStoreConfig>, interval_s: u64, prune_and_compact_before_upload: bool, indirect_objects_threshold: usize, @@ -86,14 +86,16 @@ impl DBCheckpointHandler { directory: Some(input_path.to_path_buf()), ..Default::default() }; - let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()]; + let mut gc_markers = vec![]; + gc_markers.push(UPLOAD_COMPLETED_MARKER.to_string()); if state_snapshot_enabled { gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string()); } Ok(Arc::new(DBCheckpointHandler { input_object_store: input_store_config.make()?, input_root_path: input_path.to_path_buf(), - output_object_store: output_object_store_config.make()?, + output_object_store: output_object_store_config + .map(|config| config.make().expect("Failed to make object store")), interval: Duration::from_secs(interval_s), gc_markers, prune_and_compact_before_upload, @@ -104,7 +106,7 @@ impl DBCheckpointHandler { } pub fn new_for_test( input_object_store_config: &ObjectStoreConfig, - output_object_store_config: &ObjectStoreConfig, + output_object_store_config: Option<&ObjectStoreConfig>, interval_s: u64, prune_and_compact_before_upload: bool, ) -> Result> { @@ -115,7 +117,8 @@ impl DBCheckpointHandler { .as_ref() .unwrap() .clone(), - output_object_store: output_object_store_config.make()?, + output_object_store: output_object_store_config + .map(|config| config.make().expect("Failed to make object store")), interval: Duration::from_secs(interval_s), gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()], prune_and_compact_before_upload, @@ -126,10 +129,20 @@ impl DBCheckpointHandler { } pub fn start(self: Arc) -> tokio::sync::broadcast::Sender<()> { let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1); - tokio::task::spawn(Self::run_db_checkpoint_upload_loop( - self.clone(), - kill_sender.subscribe(), - )); + if self.output_object_store.is_some() { + tokio::task::spawn(Self::run_db_checkpoint_upload_loop( + self.clone(), + kill_sender.subscribe(), + )); + } else { + // if db checkpoint remote store is not specified, cleanup loop + // is run to immediately mark db checkpoint upload as succesful + // so that they can be snapshotted and garbage collected + tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop( + self.clone(), + kill_sender.subscribe(), + )); + } tokio::task::spawn(Self::run_db_checkpoint_gc_loop( self, kill_sender.subscribe(), @@ -145,7 +158,7 @@ impl DBCheckpointHandler { loop { tokio::select! { _now = interval.tick() => { - if let Ok(epochs) = find_missing_epochs_dirs(&self.output_object_store, SUCCESS_MARKER).await { + if let Ok(epochs) = find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await { self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64); if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await { error!("Failed to upload db checkpoint to remote store with err: {:?}", err); @@ -160,6 +173,45 @@ impl DBCheckpointHandler { } Ok(()) } + async fn run_db_checkpoint_cleanup_loop( + self: Arc, + mut recv: tokio::sync::broadcast::Receiver<()>, + ) -> Result<()> { + let mut interval = tokio::time::interval(self.interval); + info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started"); + loop { + tokio::select! { + _now = interval.tick() => { + let local_checkpoints_by_epoch = + find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?; + let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect(); + dirs.sort_by_key(|(epoch_num, _path)| *epoch_num); + for (epoch, db_path) in dirs { + // If db checkpoint marked as completed, skip + let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?; + let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER); + if upload_completed_path.exists() { + continue; + } + if self.prune_and_compact_before_upload { + // Invoke pruning and compaction on the db checkpoint + self.prune_and_compact(local_db_path, *epoch).await?; + } + let bytes = Bytes::from_static(b"success"); + let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER); + put( + &upload_completed_marker, + bytes.clone(), + self.input_object_store.clone(), + ) + .await?; + } + }, + _ = recv.recv() => break, + } + } + Ok(()) + } async fn run_db_checkpoint_gc_loop( self: Arc, mut recv: tokio::sync::broadcast::Receiver<()>, @@ -210,12 +262,20 @@ impl DBCheckpointHandler { AuthorityStorePruner::compact(&perpetual_db)?; Ok(()) } - async fn upload_db_checkpoints_to_object_store(&self, missing_epochs: Vec) -> Result<()> { + async fn upload_db_checkpoints_to_object_store( + &self, + missing_epochs: Vec, + ) -> Result<(), anyhow::Error> { let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0); let local_checkpoints_by_epoch = find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?; let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect(); dirs.sort_by_key(|(epoch_num, _path)| *epoch_num); + let object_store = self + .output_object_store + .as_ref() + .expect("Expected objec store to exist") + .clone(); for (epoch, db_path) in dirs { if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch { if self.prune_and_compact_before_upload { @@ -228,19 +288,14 @@ impl DBCheckpointHandler { copy_recursively( db_path, self.input_object_store.clone(), - self.output_object_store.clone(), + object_store.clone(), NonZeroUsize::new(20).unwrap(), ) .await?; // Drop marker in the output directory that upload completed successfully let bytes = Bytes::from_static(b"success"); let success_marker = db_path.child(SUCCESS_MARKER); - put( - &success_marker, - bytes.clone(), - self.output_object_store.clone(), - ) - .await?; + put(&success_marker, bytes.clone(), object_store.clone()).await?; } let bytes = Bytes::from_static(b"success"); let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER); @@ -331,7 +386,7 @@ mod tests { }; let db_checkpoint_handler = DBCheckpointHandler::new_for_test( &input_store_config, - &output_store_config, + Some(&output_store_config), 10, false, )?; @@ -348,9 +403,11 @@ mod tests { .unwrap(), std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap() ); - let missing_epochs = - find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER) - .await?; + let missing_epochs = find_missing_epochs_dirs( + db_checkpoint_handler.output_object_store.as_ref().unwrap(), + SUCCESS_MARKER, + ) + .await?; db_checkpoint_handler .upload_db_checkpoints_to_object_store(missing_epochs) .await?; @@ -400,7 +457,7 @@ mod tests { }; let db_checkpoint_handler = DBCheckpointHandler::new_for_test( &input_store_config, - &output_store_config, + Some(&output_store_config), 10, false, )?; @@ -415,9 +472,11 @@ mod tests { let file3 = nested_dir.join("file3"); fs::write(file3, b"Lorem ipsum")?; - let missing_epochs = - find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER) - .await?; + let missing_epochs = find_missing_epochs_dirs( + db_checkpoint_handler.output_object_store.as_ref().unwrap(), + SUCCESS_MARKER, + ) + .await?; db_checkpoint_handler .upload_db_checkpoints_to_object_store(missing_epochs) .await?; @@ -447,9 +506,11 @@ mod tests { // Checkpoint handler should copy checkpoint for epoch_0 first before copying // epoch_1 - let missing_epochs = - find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER) - .await?; + let missing_epochs = find_missing_epochs_dirs( + db_checkpoint_handler.output_object_store.as_ref().unwrap(), + SUCCESS_MARKER, + ) + .await?; db_checkpoint_handler .upload_db_checkpoints_to_object_store(missing_epochs) .await?; @@ -517,35 +578,41 @@ mod tests { }; let db_checkpoint_handler = DBCheckpointHandler::new_for_test( &input_store_config, - &output_store_config, + Some(&output_store_config), 10, false, )?; - let missing_epochs = - find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER) - .await?; + let missing_epochs = find_missing_epochs_dirs( + db_checkpoint_handler.output_object_store.as_ref().unwrap(), + SUCCESS_MARKER, + ) + .await?; db_checkpoint_handler .upload_db_checkpoints_to_object_store(missing_epochs) .await?; - let first_missing_epoch = - find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER) - .await? - .first() - .cloned() - .unwrap(); + let first_missing_epoch = find_missing_epochs_dirs( + db_checkpoint_handler.output_object_store.as_ref().unwrap(), + SUCCESS_MARKER, + ) + .await? + .first() + .cloned() + .unwrap(); assert_eq!(first_missing_epoch, 2); let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0"); fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?; - let first_missing_epoch = - find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER) - .await? - .first() - .cloned() - .unwrap(); + let first_missing_epoch = find_missing_epochs_dirs( + db_checkpoint_handler.output_object_store.as_ref().unwrap(), + SUCCESS_MARKER, + ) + .await? + .first() + .cloned() + .unwrap(); assert_eq!(first_missing_epoch, 0); Ok(()) @@ -575,22 +642,26 @@ mod tests { }; let db_checkpoint_handler = DBCheckpointHandler::new_for_test( &input_store_config, - &output_store_config, + Some(&output_store_config), 10, false, )?; - let missing_epochs = - find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER) - .await?; + let missing_epochs = find_missing_epochs_dirs( + db_checkpoint_handler.output_object_store.as_ref().unwrap(), + SUCCESS_MARKER, + ) + .await?; assert_eq!(missing_epochs, vec![0]); db_checkpoint_handler .upload_db_checkpoints_to_object_store(missing_epochs) .await?; - let missing_epochs = - find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER) - .await?; + let missing_epochs = find_missing_epochs_dirs( + db_checkpoint_handler.output_object_store.as_ref().unwrap(), + SUCCESS_MARKER, + ) + .await?; let mut expected_missing_epochs: Vec = (0..100).collect(); expected_missing_epochs.extend((101..200).collect_vec().iter()); expected_missing_epochs.push(201); diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 0c79acb1f79eb6..9c15d26be8edbd 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -819,24 +819,42 @@ impl SuiNode { DBCheckpointConfig, Option>, )> { + let checkpoint_path = Some( + config + .db_checkpoint_config + .checkpoint_path + .clone() + .unwrap_or_else(|| config.db_checkpoint_path()), + ); let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() { DBCheckpointConfig { - checkpoint_path: Some(config.db_checkpoint_path()), + checkpoint_path, + perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled { + true + } else { + config + .db_checkpoint_config + .perform_db_checkpoints_at_epoch_end + }, ..config.db_checkpoint_config.clone() } } else { config.db_checkpoint_config.clone() }; - match db_checkpoint_config - .checkpoint_path - .as_ref() - .zip(db_checkpoint_config.object_store_config.as_ref()) - { - Some((path, object_store_config)) => { + match ( + db_checkpoint_config.object_store_config.as_ref(), + state_snapshot_enabled, + ) { + // If db checkpoint config object store not specified but + // state snapshot object store is specified, create handler + // anyway for marking db checkpoints as completed so that they + // can be uploaded as state snapshots. + (None, false) => Ok((db_checkpoint_config, None)), + (_, _) => { let handler = DBCheckpointHandler::new( - path, - object_store_config, + &db_checkpoint_config.checkpoint_path.clone().unwrap(), + db_checkpoint_config.object_store_config.as_ref(), 60, db_checkpoint_config .prune_and_compact_before_upload @@ -851,7 +869,6 @@ impl SuiNode { Some(DBCheckpointHandler::start(handler)), )) } - None => Ok((db_checkpoint_config, None)), } }