@@ -7,17 +7,17 @@ use chroma_error::{ChromaError, ErrorCodes};
77use chroma_sqlite:: {
88 db:: SqliteDb ,
99 helpers:: { delete_metadata, update_metadata} ,
10- table:: { EmbeddingFulltextSearch , EmbeddingMetadata , Embeddings , MaxSeqId } ,
10+ table:: { Collections , EmbeddingFulltextSearch , EmbeddingMetadata , Embeddings , MaxSeqId } ,
1111} ;
1212use chroma_types:: {
1313 operator:: {
1414 CountResult , Filter , GetResult , Limit , Projection , ProjectionOutput , ProjectionRecord , Scan ,
1515 } ,
1616 plan:: { Count , Get } ,
17- BooleanOperator , Chunk , CompositeExpression , DocumentExpression , DocumentOperator , LogRecord ,
18- MetadataComparison , MetadataExpression , MetadataSetValue , MetadataValue ,
19- MetadataValueConversionError , Operation , OperationRecord , PrimitiveOperator , SegmentUuid ,
20- SetOperator , UpdateMetadataValue , Where , CHROMA_DOCUMENT_KEY ,
17+ BooleanOperator , Chunk , CollectionUuid , CompositeExpression , DocumentExpression ,
18+ DocumentOperator , LogRecord , MetadataComparison , MetadataExpression , MetadataSetValue ,
19+ MetadataValue , MetadataValueConversionError , Operation , OperationRecord , PrimitiveOperator ,
20+ Schema , SegmentUuid , SetOperator , UpdateMetadataValue , Where , CHROMA_DOCUMENT_KEY ,
2121} ;
2222use sea_query:: {
2323 Alias , DeleteStatement , Expr , ExprTrait , Func , InsertStatement , LikeExpr , OnConflict , Query ,
@@ -41,6 +41,8 @@ pub enum SqliteMetadataError {
4141 SeaQuery ( #[ from] sea_query:: error:: Error ) ,
4242 #[ error( transparent) ]
4343 Sqlx ( #[ from] sqlx:: Error ) ,
44+ #[ error( "Could not serialize schema: {0}" ) ]
45+ SerializeSchema ( #[ from] serde_json:: Error ) ,
4446}
4547
4648impl ChromaError for SqliteMetadataError {
@@ -53,6 +55,10 @@ pub struct SqliteMetadataWriter {
5355 pub db : SqliteDb ,
5456}
5557
58+ pub struct ApplyLogsOutcome {
59+ pub schema_update : Option < Schema > ,
60+ }
61+
5662impl SqliteMetadataWriter {
5763 pub fn new ( db : SqliteDb ) -> Self {
5864 Self { db }
@@ -278,18 +284,63 @@ impl SqliteMetadataWriter {
278284 Ok ( self . db . get_conn ( ) . begin ( ) . await ?)
279285 }
280286
287+ pub async fn update_collection_schema < C > (
288+ & self ,
289+ collection_id : CollectionUuid ,
290+ schema : & Schema ,
291+ tx : & mut C ,
292+ ) -> Result < ( ) , SqliteMetadataError >
293+ where
294+ for < ' connection > & ' connection mut C : sqlx:: Executor < ' connection , Database = sqlx:: Sqlite > ,
295+ {
296+ let schema_str = serde_json:: to_string ( schema) ?;
297+ let ( sql, values) = Query :: update ( )
298+ . table ( Collections :: Table )
299+ . value ( Collections :: SchemaStr , schema_str)
300+ . and_where (
301+ Expr :: col ( ( Collections :: Table , Collections :: Id ) ) . eq ( collection_id. to_string ( ) ) ,
302+ )
303+ . build_sqlx ( SqliteQueryBuilder ) ;
304+ sqlx:: query_with ( & sql, values) . execute ( & mut * tx) . await ?;
305+ Ok ( ( ) )
306+ }
307+
308+ fn ensure_schema_for_update_value (
309+ schema : & mut Option < Schema > ,
310+ key : & str ,
311+ value : & UpdateMetadataValue ,
312+ ) -> bool {
313+ match value {
314+ UpdateMetadataValue :: None => false ,
315+ _ => {
316+ if let Some ( schema_mut) = schema. as_mut ( ) {
317+ if let Ok ( metadata_value) = MetadataValue :: try_from ( value) {
318+ return schema_mut
319+ . ensure_key_from_metadata ( key, metadata_value. value_type ( ) ) ;
320+ }
321+ }
322+ false
323+ }
324+ }
325+ }
326+
281327 pub async fn apply_logs < C > (
282328 & self ,
283329 logs : Chunk < LogRecord > ,
284330 segment_id : SegmentUuid ,
331+ schema : Option < Schema > ,
285332 tx : & mut C ,
286- ) -> Result < ( ) , SqliteMetadataError >
333+ ) -> Result < ApplyLogsOutcome , SqliteMetadataError >
287334 where
288335 for < ' connection > & ' connection mut C : sqlx:: Executor < ' connection , Database = sqlx:: Sqlite > ,
289336 {
290337 if logs. is_empty ( ) {
291- return Ok ( ( ) ) ;
338+ return Ok ( ApplyLogsOutcome {
339+ schema_update : None ,
340+ } ) ;
292341 }
342+ let mut schema = schema;
343+ let mut schema_modified = false ;
293344 let mut max_seq_id = u64:: MIN ;
294345 for (
295346 LogRecord {
@@ -323,6 +374,11 @@ impl SqliteMetadataWriter {
323374 Self :: add_record ( tx, segment_id, log_offset_unsigned, id. clone ( ) ) . await ?
324375 {
325376 if let Some ( meta) = metadata_owned {
377+ for ( key, value) in meta. iter ( ) {
378+ if Self :: ensure_schema_for_update_value ( & mut schema, key, value) {
379+ schema_modified = true ;
380+ }
381+ }
326382 update_metadata :: < EmbeddingMetadata , _ , _ > ( tx, offset_id, meta) . await ?;
327383 }
328384
@@ -336,6 +392,11 @@ impl SqliteMetadataWriter {
336392 Self :: update_record ( tx, segment_id, log_offset_unsigned, id. clone ( ) ) . await ?
337393 {
338394 if let Some ( meta) = metadata_owned {
395+ for ( key, value) in meta. iter ( ) {
396+ if Self :: ensure_schema_for_update_value ( & mut schema, key, value) {
397+ schema_modified = true ;
398+ }
399+ }
339400 update_metadata :: < EmbeddingMetadata , _ , _ > ( tx, offset_id, meta) . await ?;
340401 }
341402
@@ -351,6 +412,11 @@ impl SqliteMetadataWriter {
351412 . await ?;
352413
353414 if let Some ( meta) = metadata_owned {
415+ for ( key, value) in meta. iter ( ) {
416+ if Self :: ensure_schema_for_update_value ( & mut schema, key, value) {
417+ schema_modified = true ;
418+ }
419+ }
354420 update_metadata :: < EmbeddingMetadata , _ , _ > ( tx, offset_id, meta) . await ?;
355421 }
356422
@@ -371,7 +437,9 @@ impl SqliteMetadataWriter {
371437
372438 Self :: upsert_max_seq_id ( tx, segment_id, max_seq_id) . await ?;
373439
374- Ok ( ( ) )
440+ Ok ( ApplyLogsOutcome {
441+ schema_update : if schema_modified { schema } else { None } ,
442+ } )
375443 }
376444}
377445
@@ -910,7 +978,17 @@ mod tests {
910978 ref_seg. apply_logs( test_data. logs. clone( ) , metadata_seg_id) ;
911979 let mut tx = runtime. block_on( sqlite_seg_writer. begin( ) ) . expect( "Should be able to start transaction" ) ;
912980 let data: Chunk <LogRecord > = Chunk :: new( test_data. logs. clone( ) . into( ) ) ;
913- runtime. block_on( sqlite_seg_writer. apply_logs( data, metadata_seg_id, & mut * tx) ) . expect( "Should be able to apply logs" ) ;
981+ runtime. block_on( sqlite_seg_writer. apply_logs(
982+ data,
983+ metadata_seg_id,
984+ test_data
985+ . collection_and_segments
986+ . collection
987+ . schema
988+ . clone( ) ,
989+ & mut * tx,
990+ ) )
991+ . expect( "Should be able to apply logs" ) ;
914992 runtime. block_on( tx. commit( ) ) . expect( "Should be able to commit log" ) ;
915993
916994 let sqlite_seg_reader = SqliteMetadataReader {
@@ -938,7 +1016,17 @@ mod tests {
9381016 ref_seg. apply_logs( test_data. logs. clone( ) , metadata_seg_id) ;
9391017 let mut tx = runtime. block_on( sqlite_seg_writer. begin( ) ) . expect( "Should be able to start transaction" ) ;
9401018 let data: Chunk <LogRecord > = Chunk :: new( test_data. logs. clone( ) . into( ) ) ;
941- runtime. block_on( sqlite_seg_writer. apply_logs( data, metadata_seg_id, & mut * tx) ) . expect( "Should be able to apply logs" ) ;
1019+ runtime. block_on( sqlite_seg_writer. apply_logs(
1020+ data,
1021+ metadata_seg_id,
1022+ test_data
1023+ . collection_and_segments
1024+ . collection
1025+ . schema
1026+ . clone( ) ,
1027+ & mut * tx,
1028+ ) )
1029+ . expect( "Should be able to apply logs" ) ;
9421030 runtime. block_on( tx. commit( ) ) . expect( "Should be able to commit log" ) ;
9431031
9441032 let sqlite_seg_reader = SqliteMetadataReader {
@@ -1020,7 +1108,12 @@ mod tests {
10201108 . expect ( "Should be able to start transaction" ) ;
10211109 let data: Chunk < LogRecord > = Chunk :: new ( logs. into ( ) ) ;
10221110 sqlite_seg_writer
1023- . apply_logs ( data, metadata_seg_id, & mut * tx)
1111+ . apply_logs (
1112+ data,
1113+ metadata_seg_id,
1114+ collection_and_segments. collection . schema . clone ( ) ,
1115+ & mut * tx,
1116+ )
10241117 . await
10251118 . expect ( "Should be able to apply logs" ) ;
10261119 tx. commit ( ) . await . expect ( "Should be able to commit log" ) ;
@@ -1140,7 +1233,12 @@ mod tests {
11401233 . expect ( "Should be able to start transaction" ) ;
11411234 let data: Chunk < LogRecord > = Chunk :: new ( logs. into ( ) ) ;
11421235 sqlite_seg_writer
1143- . apply_logs ( data, metadata_seg_id, & mut * tx)
1236+ . apply_logs (
1237+ data,
1238+ metadata_seg_id,
1239+ collection_and_segments. collection . schema . clone ( ) ,
1240+ & mut * tx,
1241+ )
11441242 . await
11451243 . expect ( "Should be able to apply logs" ) ;
11461244 tx. commit ( ) . await . expect ( "Should be able to commit log" ) ;
0 commit comments