Skip to content

Commit 6472227

Browse files
committed
[ENH]: use FSTs for garbage collection
1 parent 9a652f8 commit 6472227

File tree

6 files changed

+271
-124
lines changed

6 files changed

+271
-124
lines changed

Cargo.lock

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/garbage_collector/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ chroma-memberlist = { workspace = true }
5656
chroma-tracing = { workspace = true }
5757
chroma-jemalloc-pprof-server = { workspace = true }
5858
wal3 = { workspace = true }
59+
fst = "0.4.7"
60+
itertools.workspace = true
5961

6062
[target.'cfg(not(target_env = "msvc"))'.dependencies]
6163
tikv-jemallocator = { workspace = true }

rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ use crate::operators::mark_versions_at_sysdb::{
2727
MarkVersionsAtSysDbOutput,
2828
};
2929
use crate::types::{
30-
version_graph_to_collection_dependency_graph, CleanupMode, GarbageCollectorResponse,
31-
VersionGraph, VersionGraphNode,
30+
version_graph_to_collection_dependency_graph, CleanupMode, FilePathRefCountMap,
31+
GarbageCollectorResponse, VersionGraph, VersionGraphNode,
3232
};
3333
use async_trait::async_trait;
3434
use chroma_blockstore::RootManager;
@@ -73,7 +73,7 @@ pub struct GarbageCollectorOrchestrator {
7373
pending_list_files_at_version_tasks: HashSet<(CollectionUuid, i64)>,
7474
delete_unused_file_output: Option<DeleteUnusedFilesOutput>,
7575
delete_unused_log_output: Option<DeleteUnusedLogsOutput>,
76-
file_ref_counts: HashMap<String, u32>,
76+
file_ref_counts: FilePathRefCountMap,
7777
num_pending_tasks: usize,
7878
min_versions_to_keep: u32,
7979
enable_log_gc: bool,
@@ -122,7 +122,7 @@ impl GarbageCollectorOrchestrator {
122122
cleanup_mode,
123123
result_channel: None,
124124
version_files: HashMap::new(),
125-
file_ref_counts: HashMap::new(),
125+
file_ref_counts: FilePathRefCountMap::empty(),
126126
versions_to_delete_output: None,
127127
pending_mark_versions_at_sysdb_tasks: HashSet::new(),
128128
pending_list_files_at_version_tasks: HashSet::new(),
@@ -184,6 +184,8 @@ pub enum GarbageCollectorError {
184184
CollectionDeletionFailed(#[from] DeleteCollectionError),
185185
#[error("SysDb method failed: {0}")]
186186
SysDbMethodFailed(String),
187+
#[error("Error manipulating FST: {0}")]
188+
FST(#[from] fst::Error),
187189
}
188190

189191
impl ChromaError for GarbageCollectorError {
@@ -705,10 +707,7 @@ impl GarbageCollectorOrchestrator {
705707
output.version
706708
);
707709

708-
for file_path in output.file_paths {
709-
let count = self.file_ref_counts.entry(file_path).or_insert(0);
710-
*count += 1;
711-
}
710+
self.file_ref_counts.add_set(output.file_paths, 1)?;
712711
}
713712
CollectionVersionAction::Delete => {
714713
tracing::debug!(
@@ -718,9 +717,7 @@ impl GarbageCollectorOrchestrator {
718717
output.version
719718
);
720719

721-
for file_path in output.file_paths {
722-
self.file_ref_counts.entry(file_path).or_insert(0);
723-
}
720+
self.file_ref_counts.add_set(output.file_paths, 0)?;
724721
}
725722
}
726723

@@ -745,17 +742,8 @@ impl GarbageCollectorOrchestrator {
745742

746743
// We now have results for all ListFilesAtVersionsOperator tasks that we spawned
747744
tracing::trace!("File ref counts: {:#?}", self.file_ref_counts);
748-
let file_paths_to_delete = self
749-
.file_ref_counts
750-
.iter()
751-
.filter_map(|(path, count)| {
752-
if *count == 0 {
753-
Some(path.clone())
754-
} else {
755-
None
756-
}
757-
})
758-
.collect::<Vec<_>>();
745+
746+
let file_paths_to_delete = self.file_ref_counts.filter_by_count(0)?;
759747

760748
let delete_percentage =
761749
file_paths_to_delete.len() as f32 / self.file_ref_counts.len() as f32 * 100.0;
@@ -797,7 +785,6 @@ impl GarbageCollectorOrchestrator {
797785
)),
798786
DeleteUnusedFilesInput {
799787
unused_s3_files: file_paths_to_delete,
800-
hnsw_prefixes_for_deletion: vec![],
801788
},
802789
ctx.receiver(),
803790
self.context.task_cancellation_token.clone(),
@@ -834,7 +821,7 @@ impl GarbageCollectorOrchestrator {
834821
return Ok(());
835822
}
836823

837-
self.num_files_deleted += output.deleted_files.len() as u32;
824+
self.num_files_deleted += output.num_deleted_files as u32;
838825

839826
let versions_to_delete = self.versions_to_delete_output.as_ref().ok_or(
840827
GarbageCollectorError::InvariantViolation(

0 commit comments

Comments
 (0)