Skip to content

Commit a1ea81a

Browse files
authored
[ENH] recognize and flush new metadata keys to schema on local compaction (#5728)
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - This PR adds support for discovering new metadata keys during log flushing for local chroma - New functionality - ... ## Test plan added e2e test support to run for distributed, local, and single node to ensure metadata discovery and related tests run _How are these changes tested?_ - [x ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent c26b0a8 commit a1ea81a

File tree

5 files changed

+147
-23
lines changed

5 files changed

+147
-23
lines changed

chromadb/test/api/test_schema_e2e.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,8 @@ def test_schema_defaults_enable_indexed_operations(
363363
# Ensure underlying schema persisted across fetches
364364
reloaded = client.get_collection(collection.name)
365365
assert reloaded.schema is not None
366-
assert reloaded.schema.serialize_to_json() == schema.serialize_to_json()
366+
if not is_spann_disabled_mode:
367+
assert reloaded.schema.serialize_to_json() == schema.serialize_to_json()
367368

368369

369370
def test_get_or_create_and_get_collection_preserve_schema(
@@ -541,7 +542,8 @@ def test_schema_persistence_with_custom_overrides(
541542
reloaded_client = client_factories.create_client_from_system()
542543
reloaded_collection = reloaded_client.get_collection(name=collection.name)
543544
assert reloaded_collection.schema is not None
544-
assert reloaded_collection.schema.serialize_to_json() == expected_schema_json
545+
if not is_spann_disabled_mode:
546+
assert reloaded_collection.schema.serialize_to_json() == expected_schema_json
545547

546548
fetched = reloaded_collection.get(where={"title": "Schema Persistence"})
547549
assert set(fetched["ids"]) == {"persist-1"}
@@ -784,7 +786,6 @@ def _expect_disabled_error(operation: Callable[[], Any]) -> None:
784786
_expect_disabled_error(operation)
785787

786788

787-
@pytest.mark.skipif(is_spann_disabled_mode, reason=skip_reason_spann_disabled)
788789
def test_schema_discovers_new_keys_after_compaction(
789790
client_factories: "ClientFactories",
790791
) -> None:
@@ -802,7 +803,8 @@ def test_schema_discovers_new_keys_after_compaction(
802803

803804
collection.add(ids=ids, documents=documents, metadatas=metadatas)
804805

805-
wait_for_version_increase(client, collection.name, initial_version)
806+
if not is_spann_disabled_mode:
807+
wait_for_version_increase(client, collection.name, initial_version)
806808

807809
reloaded = client.get_collection(collection.name)
808810
assert reloaded.schema is not None
@@ -828,7 +830,8 @@ def test_schema_discovers_new_keys_after_compaction(
828830
metadatas=upsert_metadatas,
829831
)
830832

831-
wait_for_version_increase(client, collection.name, next_version)
833+
if not is_spann_disabled_mode:
834+
wait_for_version_increase(client, collection.name, next_version)
832835

833836
post_upsert = client.get_collection(collection.name)
834837
assert post_upsert.schema is not None
@@ -852,7 +855,6 @@ def test_schema_discovers_new_keys_after_compaction(
852855
assert "discover_upsert" in persisted.schema.keys
853856

854857

855-
@pytest.mark.skipif(is_spann_disabled_mode, reason=skip_reason_spann_disabled)
856858
def test_schema_rejects_conflicting_discoverable_key_types(
857859
client_factories: "ClientFactories",
858860
) -> None:
@@ -868,7 +870,8 @@ def test_schema_rejects_conflicting_discoverable_key_types(
868870
documents = [f"doc {i}" for i in range(251)]
869871
collection.add(ids=ids, documents=documents, metadatas=metadatas)
870872

871-
wait_for_version_increase(client, collection.name, initial_version)
873+
if not is_spann_disabled_mode:
874+
wait_for_version_increase(client, collection.name, initial_version)
872875

873876
collection.upsert(
874877
ids=["conflict-bad"],
@@ -1029,7 +1032,6 @@ def test_schema_embedding_configuration_enforced(
10291032
assert "sparse_auto" not in numeric_metadata
10301033

10311034

1032-
@pytest.mark.skipif(is_spann_disabled_mode, reason=skip_reason_spann_disabled)
10331035
def test_schema_precedence_for_overrides_discoverables_and_defaults(
10341036
client_factories: "ClientFactories",
10351037
) -> None:
@@ -1054,7 +1056,9 @@ def test_schema_precedence_for_overrides_discoverables_and_defaults(
10541056

10551057
initial_version = get_collection_version(client, collection.name)
10561058
collection.add(ids=ids, documents=documents, metadatas=metadatas)
1057-
wait_for_version_increase(client, collection.name, initial_version)
1059+
1060+
if not is_spann_disabled_mode:
1061+
wait_for_version_increase(client, collection.name, initial_version)
10581062

10591063
schema_state = client.get_collection(collection.name).schema
10601064
assert schema_state is not None

rust/log/src/local_compaction_manager.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ impl Handler<BackfillMessage> for LocalCompactionManager {
140140
.sysdb
141141
.get_collection_with_segments(message.collection_id)
142142
.await?;
143+
let schema_previously_persisted = collection_and_segments.collection.schema.is_some();
143144
collection_and_segments
144145
.collection
145146
.reconcile_schema_with_config(KnnIndex::Hnsw)?;
@@ -206,14 +207,31 @@ impl Handler<BackfillMessage> for LocalCompactionManager {
206207
.begin()
207208
.await
208209
.map_err(|_| CompactionManagerError::MetadataApplyLogsFailed)?;
209-
metadata_writer
210+
let apply_outcome = metadata_writer
210211
.apply_logs(
211212
mt_data_chunk,
212213
collection_and_segments.metadata_segment.id,
214+
if schema_previously_persisted {
215+
collection_and_segments.collection.schema.clone()
216+
} else {
217+
None
218+
},
213219
&mut *tx,
214220
)
215221
.await
216222
.map_err(|_| CompactionManagerError::MetadataApplyLogsFailed)?;
223+
if schema_previously_persisted {
224+
if let Some(updated_schema) = apply_outcome.schema_update {
225+
metadata_writer
226+
.update_collection_schema(
227+
collection_and_segments.collection.collection_id,
228+
&updated_schema,
229+
&mut *tx,
230+
)
231+
.await
232+
.map_err(|_| CompactionManagerError::MetadataApplyLogsFailed)?;
233+
}
234+
}
217235
tx.commit()
218236
.await
219237
.map_err(|_| CompactionManagerError::MetadataApplyLogsFailed)?;

rust/segment/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ roaring = { workspace = true }
1111
sea-query = { workspace = true }
1212
sea-query-binder = { workspace = true, features = ["sqlx-sqlite"] }
1313
serde = { workspace = true }
14+
serde_json = { workspace = true }
1415
sqlx = { workspace = true }
1516
serde-pickle = "1.2.0"
1617
tantivy = { workspace = true }

rust/segment/src/sqlite_metadata.rs

Lines changed: 110 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@ use chroma_error::{ChromaError, ErrorCodes};
77
use chroma_sqlite::{
88
db::SqliteDb,
99
helpers::{delete_metadata, update_metadata},
10-
table::{EmbeddingFulltextSearch, EmbeddingMetadata, Embeddings, MaxSeqId},
10+
table::{Collections, EmbeddingFulltextSearch, EmbeddingMetadata, Embeddings, MaxSeqId},
1111
};
1212
use chroma_types::{
1313
operator::{
1414
CountResult, Filter, GetResult, Limit, Projection, ProjectionOutput, ProjectionRecord, Scan,
1515
},
1616
plan::{Count, Get},
17-
BooleanOperator, Chunk, CompositeExpression, DocumentExpression, DocumentOperator, LogRecord,
18-
MetadataComparison, MetadataExpression, MetadataSetValue, MetadataValue,
19-
MetadataValueConversionError, Operation, OperationRecord, PrimitiveOperator, SegmentUuid,
20-
SetOperator, UpdateMetadataValue, Where, CHROMA_DOCUMENT_KEY,
17+
BooleanOperator, Chunk, CollectionUuid, CompositeExpression, DocumentExpression,
18+
DocumentOperator, LogRecord, MetadataComparison, MetadataExpression, MetadataSetValue,
19+
MetadataValue, MetadataValueConversionError, Operation, OperationRecord, PrimitiveOperator,
20+
Schema, SegmentUuid, SetOperator, UpdateMetadataValue, Where, CHROMA_DOCUMENT_KEY,
2121
};
2222
use sea_query::{
2323
Alias, DeleteStatement, Expr, ExprTrait, Func, InsertStatement, LikeExpr, OnConflict, Query,
@@ -41,6 +41,8 @@ pub enum SqliteMetadataError {
4141
SeaQuery(#[from] sea_query::error::Error),
4242
#[error(transparent)]
4343
Sqlx(#[from] sqlx::Error),
44+
#[error("Could not serialize schema: {0}")]
45+
SerializeSchema(#[from] serde_json::Error),
4446
}
4547

4648
impl ChromaError for SqliteMetadataError {
@@ -53,6 +55,10 @@ pub struct SqliteMetadataWriter {
5355
pub db: SqliteDb,
5456
}
5557

58+
pub struct ApplyLogsOutcome {
59+
pub schema_update: Option<Schema>,
60+
}
61+
5662
impl SqliteMetadataWriter {
5763
pub fn new(db: SqliteDb) -> Self {
5864
Self { db }
@@ -278,18 +284,63 @@ impl SqliteMetadataWriter {
278284
Ok(self.db.get_conn().begin().await?)
279285
}
280286

287+
pub async fn update_collection_schema<C>(
288+
&self,
289+
collection_id: CollectionUuid,
290+
schema: &Schema,
291+
tx: &mut C,
292+
) -> Result<(), SqliteMetadataError>
293+
where
294+
for<'connection> &'connection mut C: sqlx::Executor<'connection, Database = sqlx::Sqlite>,
295+
{
296+
let schema_str = serde_json::to_string(schema)?;
297+
let (sql, values) = Query::update()
298+
.table(Collections::Table)
299+
.value(Collections::SchemaStr, schema_str)
300+
.and_where(
301+
Expr::col((Collections::Table, Collections::Id)).eq(collection_id.to_string()),
302+
)
303+
.build_sqlx(SqliteQueryBuilder);
304+
sqlx::query_with(&sql, values).execute(&mut *tx).await?;
305+
Ok(())
306+
}
307+
308+
fn ensure_schema_for_update_value(
309+
schema: &mut Option<Schema>,
310+
key: &str,
311+
value: &UpdateMetadataValue,
312+
) -> bool {
313+
match value {
314+
UpdateMetadataValue::None => false,
315+
_ => {
316+
if let Some(schema_mut) = schema.as_mut() {
317+
if let Ok(metadata_value) = MetadataValue::try_from(value) {
318+
return schema_mut
319+
.ensure_key_from_metadata(key, metadata_value.value_type());
320+
}
321+
}
322+
false
323+
}
324+
}
325+
}
326+
281327
pub async fn apply_logs<C>(
282328
&self,
283329
logs: Chunk<LogRecord>,
284330
segment_id: SegmentUuid,
331+
schema: Option<Schema>,
285332
tx: &mut C,
286-
) -> Result<(), SqliteMetadataError>
333+
) -> Result<ApplyLogsOutcome, SqliteMetadataError>
287334
where
288335
for<'connection> &'connection mut C: sqlx::Executor<'connection, Database = sqlx::Sqlite>,
289336
{
290337
if logs.is_empty() {
291-
return Ok(());
338+
return Ok(ApplyLogsOutcome {
339+
schema_update: None,
340+
});
292341
}
342+
let mut schema = schema;
343+
let mut schema_modified = false;
293344
let mut max_seq_id = u64::MIN;
294345
for (
295346
LogRecord {
@@ -323,6 +374,11 @@ impl SqliteMetadataWriter {
323374
Self::add_record(tx, segment_id, log_offset_unsigned, id.clone()).await?
324375
{
325376
if let Some(meta) = metadata_owned {
377+
for (key, value) in meta.iter() {
378+
if Self::ensure_schema_for_update_value(&mut schema, key, value) {
379+
schema_modified = true;
380+
}
381+
}
326382
update_metadata::<EmbeddingMetadata, _, _>(tx, offset_id, meta).await?;
327383
}
328384

@@ -336,6 +392,11 @@ impl SqliteMetadataWriter {
336392
Self::update_record(tx, segment_id, log_offset_unsigned, id.clone()).await?
337393
{
338394
if let Some(meta) = metadata_owned {
395+
for (key, value) in meta.iter() {
396+
if Self::ensure_schema_for_update_value(&mut schema, key, value) {
397+
schema_modified = true;
398+
}
399+
}
339400
update_metadata::<EmbeddingMetadata, _, _>(tx, offset_id, meta).await?;
340401
}
341402

@@ -351,6 +412,11 @@ impl SqliteMetadataWriter {
351412
.await?;
352413

353414
if let Some(meta) = metadata_owned {
415+
for (key, value) in meta.iter() {
416+
if Self::ensure_schema_for_update_value(&mut schema, key, value) {
417+
schema_modified = true;
418+
}
419+
}
354420
update_metadata::<EmbeddingMetadata, _, _>(tx, offset_id, meta).await?;
355421
}
356422

@@ -371,7 +437,9 @@ impl SqliteMetadataWriter {
371437

372438
Self::upsert_max_seq_id(tx, segment_id, max_seq_id).await?;
373439

374-
Ok(())
440+
Ok(ApplyLogsOutcome {
441+
schema_update: if schema_modified { schema } else { None },
442+
})
375443
}
376444
}
377445

@@ -910,7 +978,17 @@ mod tests {
910978
ref_seg.apply_logs(test_data.logs.clone(), metadata_seg_id);
911979
let mut tx = runtime.block_on(sqlite_seg_writer.begin()).expect("Should be able to start transaction");
912980
let data: Chunk<LogRecord> = Chunk::new(test_data.logs.clone().into());
913-
runtime.block_on(sqlite_seg_writer.apply_logs(data, metadata_seg_id, &mut *tx)).expect("Should be able to apply logs");
981+
runtime.block_on(sqlite_seg_writer.apply_logs(
982+
data,
983+
metadata_seg_id,
984+
test_data
985+
.collection_and_segments
986+
.collection
987+
.schema
988+
.clone(),
989+
&mut *tx,
990+
))
991+
.expect("Should be able to apply logs");
914992
runtime.block_on(tx.commit()).expect("Should be able to commit log");
915993

916994
let sqlite_seg_reader = SqliteMetadataReader {
@@ -938,7 +1016,17 @@ mod tests {
9381016
ref_seg.apply_logs(test_data.logs.clone(), metadata_seg_id);
9391017
let mut tx = runtime.block_on(sqlite_seg_writer.begin()).expect("Should be able to start transaction");
9401018
let data: Chunk<LogRecord> = Chunk::new(test_data.logs.clone().into());
941-
runtime.block_on(sqlite_seg_writer.apply_logs(data, metadata_seg_id, &mut *tx)).expect("Should be able to apply logs");
1019+
runtime.block_on(sqlite_seg_writer.apply_logs(
1020+
data,
1021+
metadata_seg_id,
1022+
test_data
1023+
.collection_and_segments
1024+
.collection
1025+
.schema
1026+
.clone(),
1027+
&mut *tx,
1028+
))
1029+
.expect("Should be able to apply logs");
9421030
runtime.block_on(tx.commit()).expect("Should be able to commit log");
9431031

9441032
let sqlite_seg_reader = SqliteMetadataReader {
@@ -1020,7 +1108,12 @@ mod tests {
10201108
.expect("Should be able to start transaction");
10211109
let data: Chunk<LogRecord> = Chunk::new(logs.into());
10221110
sqlite_seg_writer
1023-
.apply_logs(data, metadata_seg_id, &mut *tx)
1111+
.apply_logs(
1112+
data,
1113+
metadata_seg_id,
1114+
collection_and_segments.collection.schema.clone(),
1115+
&mut *tx,
1116+
)
10241117
.await
10251118
.expect("Should be able to apply logs");
10261119
tx.commit().await.expect("Should be able to commit log");
@@ -1140,7 +1233,12 @@ mod tests {
11401233
.expect("Should be able to start transaction");
11411234
let data: Chunk<LogRecord> = Chunk::new(logs.into());
11421235
sqlite_seg_writer
1143-
.apply_logs(data, metadata_seg_id, &mut *tx)
1236+
.apply_logs(
1237+
data,
1238+
metadata_seg_id,
1239+
collection_and_segments.collection.schema.clone(),
1240+
&mut *tx,
1241+
)
11441242
.await
11451243
.expect("Should be able to apply logs");
11461244
tx.commit().await.expect("Should be able to commit log");

rust/types/src/collection_schema.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{
2020
default_search_rng_factor, default_space, default_split_threshold, default_sync_threshold,
2121
default_write_nprobe, default_write_rng_epsilon, default_write_rng_factor,
2222
HnswParametersFromSegmentError, InternalHnswConfiguration, InternalSpannConfiguration,
23-
InternalUpdateCollectionConfiguration, KnnIndex, Segment,
23+
InternalUpdateCollectionConfiguration, KnnIndex, Segment, CHROMA_KEY,
2424
};
2525

2626
impl ChromaError for SchemaError {
@@ -1925,6 +1925,9 @@ impl Schema {
19251925
}
19261926

19271927
pub fn ensure_key_from_metadata(&mut self, key: &str, value_type: MetadataValueType) -> bool {
1928+
if key.starts_with(CHROMA_KEY) {
1929+
return false;
1930+
}
19281931
let value_types = self.keys.entry(key.to_string()).or_default();
19291932
match value_type {
19301933
MetadataValueType::Bool => {

0 commit comments

Comments
 (0)