-
Notifications
You must be signed in to change notification settings - Fork 206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support persist manifest efficientlly #1596
base: main
Are you sure you want to change the base?
feat: support persist manifest efficientlly #1596
Conversation
96179be
to
3ea66e8
Compare
3ea66e8
to
fe5f793
Compare
@@ -29,13 +39,16 @@ use crate::{ | |||
|
|||
pub const PREFIX_PATH: &str = "manifest"; | |||
pub const SNAPSHOT_FILENAME: &str = "snapshot"; | |||
pub const SST_PREFIX: &str = "sst"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those files should not be named sst, sst is only used in LSM, and represent sorted string table, here those file are delta files based on snapshot.
} | ||
|
||
let mut sst_files = Vec::with_capacity(paths.len()); | ||
for path in &paths { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could fetch those files concurrently, one after one is too slow.
.await | ||
.context("failed to read sst file")?; | ||
let pb_sst = pb_types::SstFile::decode(bytes).context("failed to decode sst file")?; | ||
sst_files.push(SstFile::try_from(pb_sst)?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to dedup files before merge, since there may exists old delta files, which are failed to delete in last merge.
.context("Failed to update manifest")?; | ||
|
||
// 2. Delete the old sst files | ||
for path in &paths { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete concurrently
self.store | ||
.delete(path) | ||
.await | ||
.context("failed to delete sst file")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add path name to log
tokio::spawn(async move { | ||
ssts_merge.run().await; | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When start up, we also need to read delta files, if we only read snapshot, then part of sst won't be queried later?
Self { | ||
channel_size: 100, | ||
max_interval_seconds: 5, | ||
merge_threshold: 50, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10-20 is a reasonable choice.
pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> { | ||
let mut payload = self.payload.write().await; | ||
let mut tmp_ssts = payload.files.clone(); | ||
let new_sst_path = Path::from(format!("{}/{id}", self.sst_path)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are too many pending delta files, we should block the write here.
Rationale
close #1592
Detailed Changes
Test Plan
CI