Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/garbage_collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ chroma-memberlist = { workspace = true }
chroma-tracing = { workspace = true }
chroma-jemalloc-pprof-server = { workspace = true }
wal3 = { workspace = true }
fst = "0.4.7"
itertools.workspace = true

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { workspace = true }
Expand Down
35 changes: 11 additions & 24 deletions rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::operators::mark_versions_at_sysdb::{
MarkVersionsAtSysDbOutput,
};
use crate::types::{
version_graph_to_collection_dependency_graph, CleanupMode, GarbageCollectorResponse,
VersionGraph, VersionGraphNode,
version_graph_to_collection_dependency_graph, CleanupMode, FilePathRefCountMap,
GarbageCollectorResponse, VersionGraph, VersionGraphNode,
};
use async_trait::async_trait;
use chroma_blockstore::RootManager;
Expand Down Expand Up @@ -73,7 +73,7 @@ pub struct GarbageCollectorOrchestrator {
pending_list_files_at_version_tasks: HashSet<(CollectionUuid, i64)>,
delete_unused_file_output: Option<DeleteUnusedFilesOutput>,
delete_unused_log_output: Option<DeleteUnusedLogsOutput>,
file_ref_counts: HashMap<String, u32>,
file_ref_counts: FilePathRefCountMap,
num_pending_tasks: usize,
min_versions_to_keep: u32,
enable_log_gc: bool,
Expand Down Expand Up @@ -122,7 +122,7 @@ impl GarbageCollectorOrchestrator {
cleanup_mode,
result_channel: None,
version_files: HashMap::new(),
file_ref_counts: HashMap::new(),
file_ref_counts: FilePathRefCountMap::empty(),
versions_to_delete_output: None,
pending_mark_versions_at_sysdb_tasks: HashSet::new(),
pending_list_files_at_version_tasks: HashSet::new(),
Expand Down Expand Up @@ -184,6 +184,8 @@ pub enum GarbageCollectorError {
CollectionDeletionFailed(#[from] DeleteCollectionError),
#[error("SysDb method failed: {0}")]
SysDbMethodFailed(String),
#[error("Error manipulating FST: {0}")]
FST(#[from] fst::Error),
}

impl ChromaError for GarbageCollectorError {
Expand Down Expand Up @@ -705,10 +707,7 @@ impl GarbageCollectorOrchestrator {
output.version
);

for file_path in output.file_paths {
let count = self.file_ref_counts.entry(file_path).or_insert(0);
*count += 1;
}
self.file_ref_counts.add_set(output.file_paths, 1)?;
}
CollectionVersionAction::Delete => {
tracing::debug!(
Expand All @@ -718,9 +717,7 @@ impl GarbageCollectorOrchestrator {
output.version
);

for file_path in output.file_paths {
self.file_ref_counts.entry(file_path).or_insert(0);
}
self.file_ref_counts.add_set(output.file_paths, 0)?;
}
}

Expand All @@ -745,17 +742,8 @@ impl GarbageCollectorOrchestrator {

// We now have results for all ListFilesAtVersionsOperator tasks that we spawned
tracing::trace!("File ref counts: {:#?}", self.file_ref_counts);
let file_paths_to_delete = self
.file_ref_counts
.iter()
.filter_map(|(path, count)| {
if *count == 0 {
Some(path.clone())
} else {
None
}
})
.collect::<Vec<_>>();

let file_paths_to_delete = self.file_ref_counts.filter_by_count(0)?;

let delete_percentage =
file_paths_to_delete.len() as f32 / self.file_ref_counts.len() as f32 * 100.0;
Expand Down Expand Up @@ -797,7 +785,6 @@ impl GarbageCollectorOrchestrator {
)),
DeleteUnusedFilesInput {
unused_s3_files: file_paths_to_delete,
hnsw_prefixes_for_deletion: vec![],
},
ctx.receiver(),
self.context.task_cancellation_token.clone(),
Expand Down Expand Up @@ -834,7 +821,7 @@ impl GarbageCollectorOrchestrator {
return Ok(());
}

self.num_files_deleted += output.deleted_files.len() as u32;
self.num_files_deleted += output.num_deleted_files as u32;

let versions_to_delete = self.versions_to_delete_output.as_ref().ok_or(
GarbageCollectorError::InvariantViolation(
Expand Down
Loading
Loading