Skip to content

Commit b1db218

Browse files
committed
[ENH] recognize and flush new metadata keys to schema on local compaction
1 parent 9c47af4 commit b1db218

File tree

3 files changed

+127
-13
lines changed

3 files changed

+127
-13
lines changed

rust/log/src/local_compaction_manager.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,17 +206,27 @@ impl Handler<BackfillMessage> for LocalCompactionManager {
206206
.begin()
207207
.await
208208
.map_err(|_| CompactionManagerError::MetadataApplyLogsFailed)?;
209-
metadata_writer
209+
let apply_outcome = metadata_writer
210210
.apply_logs(
211211
mt_data_chunk,
212212
collection_and_segments.metadata_segment.id,
213+
collection_and_segments.collection.schema.clone(),
213214
&mut *tx,
214215
)
215216
.await
216217
.map_err(|_| CompactionManagerError::MetadataApplyLogsFailed)?;
217218
tx.commit()
218219
.await
219220
.map_err(|_| CompactionManagerError::MetadataApplyLogsFailed)?;
221+
if let Some(updated_schema) = apply_outcome.schema_update {
222+
metadata_writer
223+
.update_collection_schema(
224+
collection_and_segments.collection.collection_id,
225+
&updated_schema,
226+
)
227+
.await
228+
.map_err(|_| CompactionManagerError::MetadataApplyLogsFailed)?;
229+
}
220230
// Next apply it to the hnsw writer.
221231
let mut hnsw_writer = self
222232
.hnsw_segment_manager

rust/segment/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ roaring = { workspace = true }
1111
sea-query = { workspace = true }
1212
sea-query-binder = { workspace = true, features = ["sqlx-sqlite"] }
1313
serde = { workspace = true }
14+
serde_json = { workspace = true }
1415
sqlx = { workspace = true }
1516
serde-pickle = "1.2.0"
1617
tantivy = { workspace = true }

rust/segment/src/sqlite_metadata.rs

Lines changed: 115 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@ use chroma_error::{ChromaError, ErrorCodes};
77
use chroma_sqlite::{
88
db::SqliteDb,
99
helpers::{delete_metadata, update_metadata},
10-
table::{EmbeddingFulltextSearch, EmbeddingMetadata, Embeddings, MaxSeqId},
10+
table::{Collections, EmbeddingFulltextSearch, EmbeddingMetadata, Embeddings, MaxSeqId},
1111
};
1212
use chroma_types::{
1313
operator::{
1414
CountResult, Filter, GetResult, Limit, Projection, ProjectionOutput, ProjectionRecord, Scan,
1515
},
1616
plan::{Count, Get},
17-
BooleanOperator, Chunk, CompositeExpression, DocumentExpression, DocumentOperator, LogRecord,
18-
MetadataComparison, MetadataExpression, MetadataSetValue, MetadataValue,
19-
MetadataValueConversionError, Operation, OperationRecord, PrimitiveOperator, SegmentUuid,
20-
SetOperator, UpdateMetadataValue, Where, CHROMA_DOCUMENT_KEY,
17+
BooleanOperator, Chunk, CollectionUuid, CompositeExpression, DocumentExpression,
18+
DocumentOperator, LogRecord, MetadataComparison, MetadataExpression, MetadataSetValue,
19+
MetadataValue, MetadataValueConversionError, Operation, OperationRecord, PrimitiveOperator,
20+
Schema, SegmentUuid, SetOperator, UpdateMetadataValue, Where, CHROMA_DOCUMENT_KEY,
2121
};
2222
use sea_query::{
2323
Alias, DeleteStatement, Expr, ExprTrait, Func, InsertStatement, LikeExpr, OnConflict, Query,
@@ -41,6 +41,8 @@ pub enum SqliteMetadataError {
4141
SeaQuery(#[from] sea_query::error::Error),
4242
#[error(transparent)]
4343
Sqlx(#[from] sqlx::Error),
44+
#[error("Could not serialize schema: {0}")]
45+
SerializeSchema(#[from] serde_json::Error),
4446
}
4547

4648
impl ChromaError for SqliteMetadataError {
@@ -53,6 +55,11 @@ pub struct SqliteMetadataWriter {
5355
pub db: SqliteDb,
5456
}
5557

58+
pub struct ApplyLogsOutcome {
59+
pub schema_update: Option<Schema>,
60+
pub max_seq_id: Option<u64>,
61+
}
62+
5663
impl SqliteMetadataWriter {
5764
pub fn new(db: SqliteDb) -> Self {
5865
Self { db }
@@ -278,19 +285,64 @@ impl SqliteMetadataWriter {
278285
Ok(self.db.get_conn().begin().await?)
279286
}
280287

288+
pub async fn update_collection_schema(
289+
&self,
290+
collection_id: CollectionUuid,
291+
schema: &Schema,
292+
) -> Result<(), SqliteMetadataError> {
293+
let schema_str = serde_json::to_string(schema)?;
294+
let (sql, values) = Query::update()
295+
.table(Collections::Table)
296+
.value(Collections::SchemaStr, schema_str)
297+
.and_where(
298+
Expr::col((Collections::Table, Collections::Id)).eq(collection_id.to_string()),
299+
)
300+
.build_sqlx(SqliteQueryBuilder);
301+
sqlx::query_with(&sql, values)
302+
.execute(self.db.get_conn())
303+
.await?;
304+
Ok(())
305+
}
306+
307+
fn ensure_schema_for_update_value(
308+
schema: &mut Option<Schema>,
309+
key: &str,
310+
value: &UpdateMetadataValue,
311+
) -> bool {
312+
match value {
313+
UpdateMetadataValue::None => false,
314+
_ => {
315+
if let Some(schema_mut) = schema.as_mut() {
316+
if let Ok(metadata_value) = MetadataValue::try_from(value) {
317+
return schema_mut
318+
.ensure_key_from_metadata(key, metadata_value.value_type());
319+
}
320+
}
321+
false
322+
}
323+
}
324+
}
325+
281326
pub async fn apply_logs<C>(
282327
&self,
283328
logs: Chunk<LogRecord>,
284329
segment_id: SegmentUuid,
330+
schema: Option<Schema>,
285331
tx: &mut C,
286-
) -> Result<(), SqliteMetadataError>
332+
) -> Result<ApplyLogsOutcome, SqliteMetadataError>
287333
where
288334
for<'connection> &'connection mut C: sqlx::Executor<'connection, Database = sqlx::Sqlite>,
289335
{
290336
if logs.is_empty() {
291-
return Ok(());
337+
return Ok(ApplyLogsOutcome {
338+
schema_update: None,
339+
max_seq_id: None,
340+
});
292341
}
342+
let mut schema = schema;
343+
let mut schema_modified = false;
293344
let mut max_seq_id = u64::MIN;
345+
let mut saw_log = false;
294346
for (
295347
LogRecord {
296348
log_offset,
@@ -307,6 +359,7 @@ impl SqliteMetadataWriter {
307359
) in logs.iter()
308360
{
309361
let log_offset_unsigned = (*log_offset).try_into()?;
362+
saw_log = true;
310363
max_seq_id = max_seq_id.max(log_offset_unsigned);
311364
let mut metadata_owned = metadata.clone();
312365
if let Some(doc) = document {
@@ -323,6 +376,11 @@ impl SqliteMetadataWriter {
323376
Self::add_record(tx, segment_id, log_offset_unsigned, id.clone()).await?
324377
{
325378
if let Some(meta) = metadata_owned {
379+
for (key, value) in meta.iter() {
380+
if Self::ensure_schema_for_update_value(&mut schema, key, value) {
381+
schema_modified = true;
382+
}
383+
}
326384
update_metadata::<EmbeddingMetadata, _, _>(tx, offset_id, meta).await?;
327385
}
328386

@@ -336,6 +394,11 @@ impl SqliteMetadataWriter {
336394
Self::update_record(tx, segment_id, log_offset_unsigned, id.clone()).await?
337395
{
338396
if let Some(meta) = metadata_owned {
397+
for (key, value) in meta.iter() {
398+
if Self::ensure_schema_for_update_value(&mut schema, key, value) {
399+
schema_modified = true;
400+
}
401+
}
339402
update_metadata::<EmbeddingMetadata, _, _>(tx, offset_id, meta).await?;
340403
}
341404

@@ -351,6 +414,11 @@ impl SqliteMetadataWriter {
351414
.await?;
352415

353416
if let Some(meta) = metadata_owned {
417+
for (key, value) in meta.iter() {
418+
if Self::ensure_schema_for_update_value(&mut schema, key, value) {
419+
schema_modified = true;
420+
}
421+
}
354422
update_metadata::<EmbeddingMetadata, _, _>(tx, offset_id, meta).await?;
355423
}
356424

@@ -371,7 +439,12 @@ impl SqliteMetadataWriter {
371439

372440
Self::upsert_max_seq_id(tx, segment_id, max_seq_id).await?;
373441

374-
Ok(())
442+
let max_seq_id = if saw_log { Some(max_seq_id) } else { None };
443+
444+
Ok(ApplyLogsOutcome {
445+
schema_update: if schema_modified { schema } else { None },
446+
max_seq_id,
447+
})
375448
}
376449
}
377450

@@ -910,7 +983,17 @@ mod tests {
910983
ref_seg.apply_logs(test_data.logs.clone(), metadata_seg_id);
911984
let mut tx = runtime.block_on(sqlite_seg_writer.begin()).expect("Should be able to start transaction");
912985
let data: Chunk<LogRecord> = Chunk::new(test_data.logs.clone().into());
913-
runtime.block_on(sqlite_seg_writer.apply_logs(data, metadata_seg_id, &mut *tx)).expect("Should be able to apply logs");
986+
runtime.block_on(sqlite_seg_writer.apply_logs(
987+
data,
988+
metadata_seg_id,
989+
test_data
990+
.collection_and_segments
991+
.collection
992+
.schema
993+
.clone(),
994+
&mut *tx,
995+
))
996+
.expect("Should be able to apply logs");
914997
runtime.block_on(tx.commit()).expect("Should be able to commit log");
915998

916999
let sqlite_seg_reader = SqliteMetadataReader {
@@ -938,7 +1021,17 @@ mod tests {
9381021
ref_seg.apply_logs(test_data.logs.clone(), metadata_seg_id);
9391022
let mut tx = runtime.block_on(sqlite_seg_writer.begin()).expect("Should be able to start transaction");
9401023
let data: Chunk<LogRecord> = Chunk::new(test_data.logs.clone().into());
941-
runtime.block_on(sqlite_seg_writer.apply_logs(data, metadata_seg_id, &mut *tx)).expect("Should be able to apply logs");
1024+
runtime.block_on(sqlite_seg_writer.apply_logs(
1025+
data,
1026+
metadata_seg_id,
1027+
test_data
1028+
.collection_and_segments
1029+
.collection
1030+
.schema
1031+
.clone(),
1032+
&mut *tx,
1033+
))
1034+
.expect("Should be able to apply logs");
9421035
runtime.block_on(tx.commit()).expect("Should be able to commit log");
9431036

9441037
let sqlite_seg_reader = SqliteMetadataReader {
@@ -1020,7 +1113,12 @@ mod tests {
10201113
.expect("Should be able to start transaction");
10211114
let data: Chunk<LogRecord> = Chunk::new(logs.into());
10221115
sqlite_seg_writer
1023-
.apply_logs(data, metadata_seg_id, &mut *tx)
1116+
.apply_logs(
1117+
data,
1118+
metadata_seg_id,
1119+
collection_and_segments.collection.schema.clone(),
1120+
&mut *tx,
1121+
)
10241122
.await
10251123
.expect("Should be able to apply logs");
10261124
tx.commit().await.expect("Should be able to commit log");
@@ -1140,7 +1238,12 @@ mod tests {
11401238
.expect("Should be able to start transaction");
11411239
let data: Chunk<LogRecord> = Chunk::new(logs.into());
11421240
sqlite_seg_writer
1143-
.apply_logs(data, metadata_seg_id, &mut *tx)
1241+
.apply_logs(
1242+
data,
1243+
metadata_seg_id,
1244+
collection_and_segments.collection.schema.clone(),
1245+
&mut *tx,
1246+
)
11441247
.await
11451248
.expect("Should be able to apply logs");
11461249
tx.commit().await.expect("Should be able to commit log");

0 commit comments

Comments
 (0)