Skip to content

Commit

Permalink
Support DB Checkpoint without upload
Browse files Browse the repository at this point in the history
  • Loading branch information
williampsmith committed Nov 7, 2023
1 parent 9ee0508 commit 2fdb180
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 64 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ pub struct AuthorityState {
_pruner: AuthorityStorePruner,
_authority_per_epoch_pruner: AuthorityPerEpochStorePruner,

/// Take db checkpoints af different dbs
/// Take db checkpoints of different dbs
db_checkpoint_config: DBCheckpointConfig,

/// Config controlling what kind of expensive safety checks to perform.
Expand Down
177 changes: 124 additions & 53 deletions crates/sui-core/src/db_checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct DBCheckpointHandler {
/// DB checkpoint directory on local filesystem
input_root_path: PathBuf,
/// Bucket on cloud object store where db checkpoints will be copied
output_object_store: Arc<DynObjectStore>,
output_object_store: Option<Arc<DynObjectStore>>,
/// Time interval to check for presence of new db checkpoint
interval: Duration,
/// File markers which signal that local db checkpoint can be garbage collected
Expand All @@ -73,7 +73,7 @@ pub struct DBCheckpointHandler {
impl DBCheckpointHandler {
pub fn new(
input_path: &std::path::Path,
output_object_store_config: &ObjectStoreConfig,
output_object_store_config: Option<&ObjectStoreConfig>,
interval_s: u64,
prune_and_compact_before_upload: bool,
indirect_objects_threshold: usize,
Expand All @@ -86,14 +86,16 @@ impl DBCheckpointHandler {
directory: Some(input_path.to_path_buf()),
..Default::default()
};
let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
let mut gc_markers = vec![];
gc_markers.push(UPLOAD_COMPLETED_MARKER.to_string());
if state_snapshot_enabled {
gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
}
Ok(Arc::new(DBCheckpointHandler {
input_object_store: input_store_config.make()?,
input_root_path: input_path.to_path_buf(),
output_object_store: output_object_store_config.make()?,
output_object_store: output_object_store_config
.map(|config| config.make().expect("Failed to make object store")),
interval: Duration::from_secs(interval_s),
gc_markers,
prune_and_compact_before_upload,
Expand All @@ -104,7 +106,7 @@ impl DBCheckpointHandler {
}
pub fn new_for_test(
input_object_store_config: &ObjectStoreConfig,
output_object_store_config: &ObjectStoreConfig,
output_object_store_config: Option<&ObjectStoreConfig>,
interval_s: u64,
prune_and_compact_before_upload: bool,
) -> Result<Arc<Self>> {
Expand All @@ -115,7 +117,8 @@ impl DBCheckpointHandler {
.as_ref()
.unwrap()
.clone(),
output_object_store: output_object_store_config.make()?,
output_object_store: output_object_store_config
.map(|config| config.make().expect("Failed to make object store")),
interval: Duration::from_secs(interval_s),
gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
prune_and_compact_before_upload,
Expand All @@ -126,10 +129,20 @@ impl DBCheckpointHandler {
}
pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
self.clone(),
kill_sender.subscribe(),
));
if self.output_object_store.is_some() {
tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
self.clone(),
kill_sender.subscribe(),
));
} else {
// if db checkpoint remote store is not specified, cleanup loop
// is run to immediately mark db checkpoint upload as succesful
// so that they can be snapshotted and garbage collected
tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
self.clone(),
kill_sender.subscribe(),
));
}
tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
self,
kill_sender.subscribe(),
Expand All @@ -145,7 +158,7 @@ impl DBCheckpointHandler {
loop {
tokio::select! {
_now = interval.tick() => {
if let Ok(epochs) = find_missing_epochs_dirs(&self.output_object_store, SUCCESS_MARKER).await {
if let Ok(epochs) = find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
Expand All @@ -160,6 +173,45 @@ impl DBCheckpointHandler {
}
Ok(())
}
async fn run_db_checkpoint_cleanup_loop(
self: Arc<Self>,
mut recv: tokio::sync::broadcast::Receiver<()>,
) -> Result<()> {
let mut interval = tokio::time::interval(self.interval);
info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
loop {
tokio::select! {
_now = interval.tick() => {
let local_checkpoints_by_epoch =
find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
for (epoch, db_path) in dirs {
// If db checkpoint marked as completed, skip
let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
if upload_completed_path.exists() {
continue;
}
if self.prune_and_compact_before_upload {
// Invoke pruning and compaction on the db checkpoint
self.prune_and_compact(local_db_path, *epoch).await?;
}
let bytes = Bytes::from_static(b"success");
let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
put(
&upload_completed_marker,
bytes.clone(),
self.input_object_store.clone(),
)
.await?;
}
},
_ = recv.recv() => break,
}
}
Ok(())
}
async fn run_db_checkpoint_gc_loop(
self: Arc<Self>,
mut recv: tokio::sync::broadcast::Receiver<()>,
Expand Down Expand Up @@ -210,12 +262,20 @@ impl DBCheckpointHandler {
AuthorityStorePruner::compact(&perpetual_db)?;
Ok(())
}
async fn upload_db_checkpoints_to_object_store(&self, missing_epochs: Vec<u64>) -> Result<()> {
async fn upload_db_checkpoints_to_object_store(
&self,
missing_epochs: Vec<u64>,
) -> Result<(), anyhow::Error> {
let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
let local_checkpoints_by_epoch =
find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
let object_store = self
.output_object_store
.as_ref()
.expect("Expected objec store to exist")
.clone();
for (epoch, db_path) in dirs {
if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
if self.prune_and_compact_before_upload {
Expand All @@ -228,19 +288,14 @@ impl DBCheckpointHandler {
copy_recursively(
db_path,
self.input_object_store.clone(),
self.output_object_store.clone(),
object_store.clone(),
NonZeroUsize::new(20).unwrap(),
)
.await?;
// Drop marker in the output directory that upload completed successfully
let bytes = Bytes::from_static(b"success");
let success_marker = db_path.child(SUCCESS_MARKER);
put(
&success_marker,
bytes.clone(),
self.output_object_store.clone(),
)
.await?;
put(&success_marker, bytes.clone(), object_store.clone()).await?;
}
let bytes = Bytes::from_static(b"success");
let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
Expand Down Expand Up @@ -331,7 +386,7 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;
Expand All @@ -348,9 +403,11 @@ mod tests {
.unwrap(),
std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
);
let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;
Expand Down Expand Up @@ -400,7 +457,7 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;
Expand All @@ -415,9 +472,11 @@ mod tests {
let file3 = nested_dir.join("file3");
fs::write(file3, b"Lorem ipsum")?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;
Expand Down Expand Up @@ -447,9 +506,11 @@ mod tests {

// Checkpoint handler should copy checkpoint for epoch_0 first before copying
// epoch_1
let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;
Expand Down Expand Up @@ -517,35 +578,41 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;

let first_missing_epoch =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?
.first()
.cloned()
.unwrap();
let first_missing_epoch = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?
.first()
.cloned()
.unwrap();
assert_eq!(first_missing_epoch, 2);

let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;

let first_missing_epoch =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?
.first()
.cloned()
.unwrap();
let first_missing_epoch = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?
.first()
.cloned()
.unwrap();
assert_eq!(first_missing_epoch, 0);

Ok(())
Expand Down Expand Up @@ -575,22 +642,26 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
assert_eq!(missing_epochs, vec![0]);
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
expected_missing_epochs.extend((101..200).collect_vec().iter());
expected_missing_epochs.push(201);
Expand Down
Loading

0 comments on commit 2fdb180

Please sign in to comment.