Skip to content

Commit

Permalink
[formal snapshots] Support without DB Checkpoint upload (#14645)
Browse files Browse the repository at this point in the history
## Description 

Today in order for formal snapshots to work, we also need to enable full
db checkpoint config, including providing an object store and awaiting
upload. Some node operators may want to only upload formal snapshots
without incurring the cost of also uploading and maintaining a db
checkpoint snapshot. Moreover, we will eventually move to only
supporting formal snapshots. This PR introduces the capability to
configure a node to generate and upload formal snapshots (state
snapshots) without the need to also upload db checkpoints. We do this by
generating a minimal db checkpoint config for use by
`DBCheckpointHandler` and introducing a separate loop path in the
handler, which simply marks new db checkpoints with `UPLOAD_COMPLETED`
marker so that they can be used for formal snapshots and subsequently
gc'ed.

## Test Plan 

Ran locally for 5 epochs with the following relevant config
```
state-snapshot-write-config:
  object-store-config:
    object-store: "File"
    directory: "/opt/sui/db/local_snapshots"
  concurrency: 5
```
 and with no db checkpoint config.

Observed snapshots being geenerated:

```
williamsmith in /opt/sui/db/authorities_db/full_node_db/db_checkpoints λ ls
epoch_4
williamsmith in /opt/sui/db/authorities_db/full_node_db/db_checkpoints λ cd ../snapshot
williamsmith in /opt/sui/db/authorities_db/full_node_db/snapshot λ ls
epoch_0 epoch_1 epoch_2 epoch_3
williamsmith in /opt/sui/db/authorities_db/full_node_db/snapshot λ cd ../..
williamsmith in /opt/sui/db/authorities_db λ cd ../local_snapshots
williamsmith in /opt/sui/db/local_snapshots λ ls
epoch_0 epoch_1 epoch_2 epoch_3
williamsmith in /opt/sui/db/local_snapshots λ ls epoch_3
1_1.obj  1_1.ref  MANIFEST _SUCCESS
```

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
williampsmith authored Nov 10, 2023
1 parent 12b10da commit 5842bf2
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 63 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ pub struct AuthorityState {
_pruner: AuthorityStorePruner,
_authority_per_epoch_pruner: AuthorityPerEpochStorePruner,

/// Take db checkpoints af different dbs
/// Take db checkpoints of different dbs
db_checkpoint_config: DBCheckpointConfig,

/// Config controlling what kind of expensive safety checks to perform.
Expand Down
170 changes: 118 additions & 52 deletions crates/sui-core/src/db_checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct DBCheckpointHandler {
/// DB checkpoint directory on local filesystem
input_root_path: PathBuf,
/// Bucket on cloud object store where db checkpoints will be copied
output_object_store: Arc<DynObjectStore>,
output_object_store: Option<Arc<DynObjectStore>>,
/// Time interval to check for presence of new db checkpoint
interval: Duration,
/// File markers which signal that local db checkpoint can be garbage collected
Expand All @@ -73,7 +73,7 @@ pub struct DBCheckpointHandler {
impl DBCheckpointHandler {
pub fn new(
input_path: &std::path::Path,
output_object_store_config: &ObjectStoreConfig,
output_object_store_config: Option<&ObjectStoreConfig>,
interval_s: u64,
prune_and_compact_before_upload: bool,
indirect_objects_threshold: usize,
Expand All @@ -93,7 +93,8 @@ impl DBCheckpointHandler {
Ok(Arc::new(DBCheckpointHandler {
input_object_store: input_store_config.make()?,
input_root_path: input_path.to_path_buf(),
output_object_store: output_object_store_config.make()?,
output_object_store: output_object_store_config
.map(|config| config.make().expect("Failed to make object store")),
interval: Duration::from_secs(interval_s),
gc_markers,
prune_and_compact_before_upload,
Expand All @@ -104,7 +105,7 @@ impl DBCheckpointHandler {
}
pub fn new_for_test(
input_object_store_config: &ObjectStoreConfig,
output_object_store_config: &ObjectStoreConfig,
output_object_store_config: Option<&ObjectStoreConfig>,
interval_s: u64,
prune_and_compact_before_upload: bool,
) -> Result<Arc<Self>> {
Expand All @@ -115,7 +116,8 @@ impl DBCheckpointHandler {
.as_ref()
.unwrap()
.clone(),
output_object_store: output_object_store_config.make()?,
output_object_store: output_object_store_config
.map(|config| config.make().expect("Failed to make object store")),
interval: Duration::from_secs(interval_s),
gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
prune_and_compact_before_upload,
Expand All @@ -126,10 +128,20 @@ impl DBCheckpointHandler {
}
pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
self.clone(),
kill_sender.subscribe(),
));
if self.output_object_store.is_some() {
tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
self.clone(),
kill_sender.subscribe(),
));
} else {
// if db checkpoint remote store is not specified, cleanup loop
// is run to immediately mark db checkpoint upload as succesful
// so that they can be snapshotted and garbage collected
tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
self.clone(),
kill_sender.subscribe(),
));
}
tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
self,
kill_sender.subscribe(),
Expand All @@ -145,7 +157,7 @@ impl DBCheckpointHandler {
loop {
tokio::select! {
_now = interval.tick() => {
if let Ok(epochs) = find_missing_epochs_dirs(&self.output_object_store, SUCCESS_MARKER).await {
if let Ok(epochs) = find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
Expand All @@ -160,6 +172,41 @@ impl DBCheckpointHandler {
}
Ok(())
}
async fn run_db_checkpoint_cleanup_loop(
self: Arc<Self>,
mut recv: tokio::sync::broadcast::Receiver<()>,
) -> Result<()> {
let mut interval = tokio::time::interval(self.interval);
info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
loop {
tokio::select! {
_now = interval.tick() => {
let local_checkpoints_by_epoch =
find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
for (_, db_path) in dirs {
// If db checkpoint marked as completed, skip
let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
if upload_completed_path.exists() {
continue;
}
let bytes = Bytes::from_static(b"success");
let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
put(
&upload_completed_marker,
bytes.clone(),
self.input_object_store.clone(),
)
.await?;
}
},
_ = recv.recv() => break,
}
}
Ok(())
}
async fn run_db_checkpoint_gc_loop(
self: Arc<Self>,
mut recv: tokio::sync::broadcast::Receiver<()>,
Expand Down Expand Up @@ -210,12 +257,20 @@ impl DBCheckpointHandler {
AuthorityStorePruner::compact(&perpetual_db)?;
Ok(())
}
async fn upload_db_checkpoints_to_object_store(&self, missing_epochs: Vec<u64>) -> Result<()> {
async fn upload_db_checkpoints_to_object_store(
&self,
missing_epochs: Vec<u64>,
) -> Result<(), anyhow::Error> {
let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
let local_checkpoints_by_epoch =
find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
let object_store = self
.output_object_store
.as_ref()
.expect("Expected object store to exist")
.clone();
for (epoch, db_path) in dirs {
if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
if self.prune_and_compact_before_upload {
Expand All @@ -228,19 +283,14 @@ impl DBCheckpointHandler {
copy_recursively(
db_path,
self.input_object_store.clone(),
self.output_object_store.clone(),
object_store.clone(),
NonZeroUsize::new(20).unwrap(),
)
.await?;
// Drop marker in the output directory that upload completed successfully
let bytes = Bytes::from_static(b"success");
let success_marker = db_path.child(SUCCESS_MARKER);
put(
&success_marker,
bytes.clone(),
self.output_object_store.clone(),
)
.await?;
put(&success_marker, bytes.clone(), object_store.clone()).await?;
}
let bytes = Bytes::from_static(b"success");
let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
Expand Down Expand Up @@ -331,7 +381,7 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;
Expand All @@ -348,9 +398,11 @@ mod tests {
.unwrap(),
std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
);
let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;
Expand Down Expand Up @@ -400,7 +452,7 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;
Expand All @@ -415,9 +467,11 @@ mod tests {
let file3 = nested_dir.join("file3");
fs::write(file3, b"Lorem ipsum")?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;
Expand Down Expand Up @@ -447,9 +501,11 @@ mod tests {

// Checkpoint handler should copy checkpoint for epoch_0 first before copying
// epoch_1
let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;
Expand Down Expand Up @@ -517,35 +573,41 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;

let first_missing_epoch =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?
.first()
.cloned()
.unwrap();
let first_missing_epoch = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?
.first()
.cloned()
.unwrap();
assert_eq!(first_missing_epoch, 2);

let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;

let first_missing_epoch =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?
.first()
.cloned()
.unwrap();
let first_missing_epoch = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?
.first()
.cloned()
.unwrap();
assert_eq!(first_missing_epoch, 0);

Ok(())
Expand Down Expand Up @@ -575,22 +637,26 @@ mod tests {
};
let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
&input_store_config,
&output_store_config,
Some(&output_store_config),
10,
false,
)?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
assert_eq!(missing_epochs, vec![0]);
db_checkpoint_handler
.upload_db_checkpoints_to_object_store(missing_epochs)
.await?;

let missing_epochs =
find_missing_epochs_dirs(&db_checkpoint_handler.output_object_store, SUCCESS_MARKER)
.await?;
let missing_epochs = find_missing_epochs_dirs(
db_checkpoint_handler.output_object_store.as_ref().unwrap(),
SUCCESS_MARKER,
)
.await?;
let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
expected_missing_epochs.extend((101..200).collect_vec().iter());
expected_missing_epochs.push(201);
Expand Down
37 changes: 27 additions & 10 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,24 +828,42 @@ impl SuiNode {
DBCheckpointConfig,
Option<tokio::sync::broadcast::Sender<()>>,
)> {
let checkpoint_path = Some(
config
.db_checkpoint_config
.checkpoint_path
.clone()
.unwrap_or_else(|| config.db_checkpoint_path()),
);
let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
DBCheckpointConfig {
checkpoint_path: Some(config.db_checkpoint_path()),
checkpoint_path,
perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
true
} else {
config
.db_checkpoint_config
.perform_db_checkpoints_at_epoch_end
},
..config.db_checkpoint_config.clone()
}
} else {
config.db_checkpoint_config.clone()
};

match db_checkpoint_config
.checkpoint_path
.as_ref()
.zip(db_checkpoint_config.object_store_config.as_ref())
{
Some((path, object_store_config)) => {
match (
db_checkpoint_config.object_store_config.as_ref(),
state_snapshot_enabled,
) {
// If db checkpoint config object store not specified but
// state snapshot object store is specified, create handler
// anyway for marking db checkpoints as completed so that they
// can be uploaded as state snapshots.
(None, false) => Ok((db_checkpoint_config, None)),
(_, _) => {
let handler = DBCheckpointHandler::new(
path,
object_store_config,
&db_checkpoint_config.checkpoint_path.clone().unwrap(),
db_checkpoint_config.object_store_config.as_ref(),
60,
db_checkpoint_config
.prune_and_compact_before_upload
Expand All @@ -860,7 +878,6 @@ impl SuiNode {
Some(DBCheckpointHandler::start(handler)),
))
}
None => Ok((db_checkpoint_config, None)),
}
}

Expand Down

2 comments on commit 5842bf2

@vercel
Copy link

@vercel vercel bot commented on 5842bf2 Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vercel
Copy link

@vercel vercel bot commented on 5842bf2 Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.