Skip to content

Commit bca1e26

Browse files
authored
[ENH]: Dead letter queuing for compaction jobs (#5023)
## Description of changes This change adds a dead letter queueing system to the compaction scheduler. If a compaction job on a collection fails `max_failure_count` times, it will be moved to a dead set that disables this collection from being compacted while it is in this set. As of this change, the only way to clear this set is by restarting the compaction process. - Improvements & Bug fixes - Added a failing_jobs map in the CompactionManager to help keep track of jobs that have failed on consecutive attempts. - Added a dead_jobs set in the CompactionManager to record "dead" jobs. - New functionality - Described above. - Added a metric `compactor_dead_jobs_count` to track the size of the dead jobs set. ## Test plan Added a test in scheduler.rs. Also manually tested by injecting failures in certain compaction jobs and tracking the dead set size metric locally. - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent e747267 commit bca1e26

File tree

6 files changed

+199
-34
lines changed

6 files changed

+199
-34
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/worker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ async-trait = { workspace = true }
3131
roaring = { workspace = true }
3232
figment = { workspace = true }
3333
futures = { workspace = true }
34+
opentelemetry = { workspace = true }
3435
parking_lot = { workspace = true }
3536
tracing = { workspace = true }
3637
tokio = { workspace = true }

rust/worker/src/compactor/compaction_manager.rs

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,19 @@ use tracing::Instrument;
4949
use tracing::Span;
5050
use uuid::Uuid;
5151

52-
type BoxedFuture =
53-
Pin<Box<dyn Future<Output = Result<CompactionResponse, Box<dyn ChromaError>>> + Send>>;
52+
type CompactionOutput = Result<CompactionResponse, Box<dyn ChromaError>>;
53+
type BoxedFuture = Pin<Box<dyn Future<Output = CompactionOutput> + Send>>;
5454

5555
struct CompactionTask {
5656
collection_id: CollectionUuid,
5757
future: BoxedFuture,
5858
}
5959

60+
struct CompactionTaskCompletion {
61+
collection_id: CollectionUuid,
62+
result: CompactionOutput,
63+
}
64+
6065
#[derive(Clone)]
6166
pub(crate) struct CompactionManagerContext {
6267
system: System,
@@ -85,7 +90,7 @@ pub(crate) struct CompactionManager {
8590
scheduler: Scheduler,
8691
context: CompactionManagerContext,
8792
compact_awaiter_channel: mpsc::Sender<CompactionTask>,
88-
compact_awaiter_completion_channel: mpsc::UnboundedReceiver<CompactionResponse>,
93+
compact_awaiter_completion_channel: mpsc::UnboundedReceiver<CompactionTaskCompletion>,
8994
compact_awaiter: tokio::task::JoinHandle<()>,
9095
on_next_memberlist_signal: Option<oneshot::Sender<()>>,
9196
}
@@ -129,7 +134,7 @@ impl CompactionManager {
129134
// Using unbounded channel for the completion channel as its size
130135
// is bounded by max_concurrent_jobs. It's far more important for the
131136
// completion channel to not block or drop messages.
132-
let (completion_tx, completion_rx) = mpsc::unbounded_channel::<CompactionResponse>();
137+
let (completion_tx, completion_rx) = mpsc::unbounded_channel::<CompactionTaskCompletion>();
133138
let compact_awaiter = tokio::spawn(async {
134139
compact_awaiter_loop(compact_awaiter_rx, completion_tx).await;
135140
});
@@ -237,11 +242,18 @@ impl CompactionManager {
237242
self.context.dispatcher = Some(dispatcher);
238243
}
239244

240-
fn process_completions(&mut self) -> Vec<CompactionResponse> {
245+
fn process_completions(&mut self) -> Vec<CompactionTaskCompletion> {
241246
let compact_awaiter_completion_channel = &mut self.compact_awaiter_completion_channel;
242247
let mut completed_collections = Vec::new();
243248
while let Ok(resp) = compact_awaiter_completion_channel.try_recv() {
244-
self.scheduler.complete_collection(resp.collection_id);
249+
match resp.result {
250+
Ok(_) => {
251+
self.scheduler.succeed_collection(resp.collection_id);
252+
}
253+
Err(_) => {
254+
self.scheduler.fail_collection(resp.collection_id);
255+
}
256+
}
245257
completed_collections.push(resp);
246258
}
247259
completed_collections
@@ -351,6 +363,7 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
351363
Box::<dyn AssignmentPolicy>::try_from_config(assignment_policy_config, registry)
352364
.await?;
353365
let job_expiry_seconds = config.compactor.job_expiry_seconds;
366+
let max_failure_count = config.compactor.max_failure_count;
354367
let scheduler = Scheduler::new(
355368
my_ip,
356369
log.clone(),
@@ -361,6 +374,7 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
361374
assignment_policy,
362375
disabled_collections,
363376
job_expiry_seconds,
377+
max_failure_count,
364378
);
365379

366380
let blockfile_provider = BlockfileProvider::try_from_config(
@@ -407,25 +421,31 @@ impl Configurable<(CompactionServiceConfig, System)> for CompactionManager {
407421

408422
async fn compact_awaiter_loop(
409423
mut job_rx: mpsc::Receiver<CompactionTask>,
410-
completion_tx: mpsc::UnboundedSender<CompactionResponse>,
424+
completion_tx: mpsc::UnboundedSender<CompactionTaskCompletion>,
411425
) {
412426
let mut futures = FuturesUnordered::new();
413427
loop {
414428
select! {
415429
Some(job) = job_rx.recv() => {
416430
futures.push(async move {
417-
let _ = AssertUnwindSafe(job.future).catch_unwind().await;
418-
CompactionResponse {
419-
collection_id: job.collection_id,
431+
let result = AssertUnwindSafe(job.future).catch_unwind().await;
432+
match result {
433+
Ok(response) => CompactionTaskCompletion {
434+
collection_id: job.collection_id,
435+
result: response,
436+
},
437+
Err(_) => CompactionTaskCompletion {
438+
collection_id: job.collection_id,
439+
result: Err(Box::new(CompactionError::FailedToCompact)),
440+
},
420441
}
421442
});
422443
}
423-
Some(compaction_response) = futures.next() => {
424-
match completion_tx.send(compaction_response) {
444+
Some(completed_job) = futures.next() => {
445+
let collection_id = completed_job.collection_id;
446+
match completion_tx.send(completed_job) {
425447
Ok(_) => {},
426-
Err(_) => {
427-
tracing::error!("Failed to send compaction response");
428-
}
448+
Err(_) => tracing::error!("Failed to record compaction result for collection {}", collection_id),
429449
}
430450
}
431451
else => {
@@ -773,6 +793,7 @@ mod tests {
773793
let fetch_log_batch_size = 100;
774794
let purge_dirty_log_timeout_seconds = 60;
775795
let job_expiry_seconds = 3600;
796+
let max_failure_count = 3;
776797

777798
// Set assignment policy
778799
let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::default());
@@ -788,6 +809,7 @@ mod tests {
788809
assignment_policy,
789810
HashSet::new(),
790811
job_expiry_seconds,
812+
max_failure_count,
791813
);
792814
// Set memberlist
793815
scheduler.set_memberlist(vec![my_member.clone()]);
@@ -865,6 +887,7 @@ mod tests {
865887
completed_compactions.extend(
866888
completed
867889
.iter()
890+
.filter(|c| c.result.is_ok())
868891
.map(|c| c.collection_id)
869892
.collect::<Vec<CollectionUuid>>(),
870893
);

rust/worker/src/compactor/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ pub struct CompactorConfig {
2222
pub fetch_log_batch_size: u32,
2323
#[serde(default = "CompactorConfig::default_purge_dirty_log_timeout_seconds")]
2424
pub purge_dirty_log_timeout_seconds: u64,
25+
#[serde(default = "CompactorConfig::default_max_failure_count")]
26+
pub max_failure_count: u8,
2527
}
2628

2729
impl CompactorConfig {
@@ -64,6 +66,10 @@ impl CompactorConfig {
6466
fn default_purge_dirty_log_timeout_seconds() -> u64 {
6567
60
6668
}
69+
70+
fn default_max_failure_count() -> u8 {
71+
5
72+
}
6773
}
6874

6975
impl Default for CompactorConfig {
@@ -80,6 +86,7 @@ impl Default for CompactorConfig {
8086
fetch_log_batch_size: CompactorConfig::default_fetch_log_batch_size(),
8187
purge_dirty_log_timeout_seconds:
8288
CompactorConfig::default_purge_dirty_log_timeout_seconds(),
89+
max_failure_count: CompactorConfig::default_max_failure_count(),
8390
}
8491
}
8592
}

0 commit comments

Comments
 (0)