Skip to content

Commit 876bb18

Browse files
emmaling27Convex, Inc.
authored and
Convex, Inc.
committed
Text compaction race test (#37262)
Copies over structure of vector search compaction tests to text search. GitOrigin-RevId: 684672faacf1dcd8f4fd1f9845fb0ff2fe215ced
1 parent b98f50f commit 876bb18

File tree

4 files changed

+99
-6
lines changed

4 files changed

+99
-6
lines changed

crates/database/src/index_workers/search_compactor.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,12 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
108108
let mut to_build = vec![];
109109
let mut tx = self.database.begin(Identity::system()).await?;
110110

111-
// Skip compaction on empty tables.
112-
for index_doc in IndexModel::new(&mut tx)
111+
let non_empty_search_indexes = IndexModel::new(&mut tx)
113112
.get_all_non_empty_search_indexes()
114-
.await?
115-
{
113+
.await?;
114+
115+
// Skip compaction on empty tables.
116+
for index_doc in non_empty_search_indexes {
116117
let (index_id, index_metadata) = index_doc.into_id_and_value();
117118
let Some(config) = T::get_config(index_metadata.config) else {
118119
continue;
@@ -147,11 +148,19 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
147148
&config.developer_config,
148149
&self.config,
149150
)?,
150-
_ => continue,
151+
_ => {
152+
tracing::info!(
153+
"Skipping {:?} index for compaction: {name:?} because it is not \
154+
backfilling",
155+
Self::search_type()
156+
);
157+
continue;
158+
},
151159
};
152160
if let Some((mut segments_to_compact, compaction_reason)) = maybe_segments_to_compact {
153161
tracing::info!(
154-
"Queueing {:?} index for compaction: {name:?}",
162+
"Queueing {:?} index for compaction: {name:?} for reason: \
163+
{compaction_reason:?}",
155164
Self::search_type()
156165
);
157166
// Choose segments to compact at random.

crates/database/src/tests/text_test_utils.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use common::{
1717
ParsedDocument,
1818
ResolvedDocument,
1919
},
20+
pause::PauseController,
2021
persistence::PersistenceReader,
2122
query::{
2223
Query,
@@ -35,6 +36,7 @@ use common::{
3536
},
3637
version::MIN_NPM_VERSION_FOR_FUZZY_SEARCH,
3738
};
39+
use futures::try_join;
3840
use maplit::btreeset;
3941
use must_let::must_let;
4042
use search::{
@@ -79,6 +81,7 @@ use crate::{
7981
Transaction,
8082
};
8183

84+
#[derive(Clone)]
8285
pub struct TextFixtures {
8386
pub rt: TestRuntime,
8487
pub storage: Arc<dyn Storage>,
@@ -345,6 +348,29 @@ impl TextFixtures {
345348
}
346349
Ok(values)
347350
}
351+
352+
pub async fn run_compaction_during_flush(
353+
&self,
354+
pause: PauseController,
355+
label: &'static str,
356+
) -> anyhow::Result<()> {
357+
let mut flusher = self.new_search_flusher();
358+
let hold_guard = pause.hold(label);
359+
let flush = flusher.step();
360+
let compactor = self.new_compactor();
361+
let compact_during_flush = async move {
362+
if let Some(pause_guard) = hold_guard.wait_for_blocked().await {
363+
let (metrics, _) = compactor.step().await?;
364+
for (_index_name, num_segments_compacted) in metrics {
365+
assert!(num_segments_compacted > 0);
366+
}
367+
pause_guard.unpause();
368+
};
369+
Ok::<(), anyhow::Error>(())
370+
};
371+
try_join!(flush, compact_during_flush)?;
372+
Ok(())
373+
}
348374
}
349375

350376
const TABLE_NAME: &str = "table";

crates/database/src/text_index_worker/flusher.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ mod tests {
192192
IndexConfig,
193193
IndexMetadata,
194194
},
195+
pause::PauseController,
195196
runtime::testing::TestRuntime,
196197
types::{
197198
IndexName,
@@ -207,6 +208,10 @@ mod tests {
207208
};
208209

209210
use crate::{
211+
index_workers::{
212+
search_compactor::CompactionConfig,
213+
search_flusher::FLUSH_RUNNING_LABEL,
214+
},
210215
tests::text_test_utils::{
211216
add_document,
212217
IndexData,
@@ -991,4 +996,56 @@ mod tests {
991996

992997
Ok(())
993998
}
999+
1000+
#[convex_macro::test_runtime]
1001+
async fn concurrent_compaction_and_flush_new_segment_propagates_deletes(
1002+
rt: TestRuntime,
1003+
pause: PauseController,
1004+
) -> anyhow::Result<()> {
1005+
let config = CompactionConfig::default();
1006+
let min_compaction_segments = config.min_compaction_segments;
1007+
let config = CompactionConfig {
1008+
// Treat everything as a large segment
1009+
small_segment_threshold_bytes: 0,
1010+
..config
1011+
};
1012+
let fixtures = TextFixtures::new_with_config(rt.clone(), config).await?;
1013+
let index_data = fixtures.enabled_text_index().await?;
1014+
1015+
let IndexData { index_name, .. } = index_data;
1016+
1017+
// Create enough segments to trigger compaction.
1018+
let mut deleted_doc_ids = vec![];
1019+
for _ in 0..min_compaction_segments {
1020+
deleted_doc_ids.push(fixtures.add_document("test").await?);
1021+
fixtures.backfill().await?;
1022+
}
1023+
1024+
// Queue up deletes for all existing segments, and one new document that will
1025+
// cause the flusher to write a new segment.
1026+
for doc_id in &deleted_doc_ids {
1027+
fixtures.replace_document(*doc_id, "updated").await?;
1028+
}
1029+
let _non_deleted_id = fixtures.add_document("test").await?;
1030+
1031+
// Run the compactor / flusher concurrently in a way where the compactor
1032+
// wins the race.
1033+
fixtures
1034+
.run_compaction_during_flush(pause, FLUSH_RUNNING_LABEL)
1035+
.await?;
1036+
1037+
// Verify we propagate the new deletes to the compacted segment and retain our
1038+
// new segment.
1039+
let segments = fixtures.get_segments_metadata(index_name).await?;
1040+
assert_eq!(2, segments.len());
1041+
1042+
let (compacted_segment, new_segment): (Vec<_>, Vec<_>) = segments
1043+
.into_iter()
1044+
.partition(|segment| segment.num_deleted_documents > 0);
1045+
assert_eq!(compacted_segment.len(), 1);
1046+
assert_eq!(new_segment.len(), 1);
1047+
// TODO Verify segment contents
1048+
1049+
Ok(())
1050+
}
9941051
}

crates/database/src/text_index_worker/text_meta.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ impl SegmentType<TextSearchIndex> for FragmentedTextSegment {
106106
Ok(self.size_bytes_total)
107107
}
108108
}
109+
109110
#[derive(Clone)]
110111
pub struct BuildTextIndexArgs {
111112
pub search_storage: Arc<dyn Storage>,

0 commit comments

Comments
 (0)