@@ -7,17 +7,17 @@ use chroma_error::{ChromaError, ErrorCodes};
77use chroma_sqlite:: {
88 db:: SqliteDb ,
99 helpers:: { delete_metadata, update_metadata} ,
10- table:: { EmbeddingFulltextSearch , EmbeddingMetadata , Embeddings , MaxSeqId } ,
10+ table:: { Collections , EmbeddingFulltextSearch , EmbeddingMetadata , Embeddings , MaxSeqId } ,
1111} ;
1212use chroma_types:: {
1313 operator:: {
1414 CountResult , Filter , GetResult , Limit , Projection , ProjectionOutput , ProjectionRecord , Scan ,
1515 } ,
1616 plan:: { Count , Get } ,
17- BooleanOperator , Chunk , CompositeExpression , DocumentExpression , DocumentOperator , LogRecord ,
18- MetadataComparison , MetadataExpression , MetadataSetValue , MetadataValue ,
19- MetadataValueConversionError , Operation , OperationRecord , PrimitiveOperator , SegmentUuid ,
20- SetOperator , UpdateMetadataValue , Where , CHROMA_DOCUMENT_KEY ,
17+ BooleanOperator , Chunk , CollectionUuid , CompositeExpression , DocumentExpression ,
18+ DocumentOperator , LogRecord , MetadataComparison , MetadataExpression , MetadataSetValue ,
19+ MetadataValue , MetadataValueConversionError , Operation , OperationRecord , PrimitiveOperator ,
20+ Schema , SegmentUuid , SetOperator , UpdateMetadataValue , Where , CHROMA_DOCUMENT_KEY ,
2121} ;
2222use sea_query:: {
2323 Alias , DeleteStatement , Expr , ExprTrait , Func , InsertStatement , LikeExpr , OnConflict , Query ,
@@ -41,6 +41,8 @@ pub enum SqliteMetadataError {
4141 SeaQuery ( #[ from] sea_query:: error:: Error ) ,
4242 #[ error( transparent) ]
4343 Sqlx ( #[ from] sqlx:: Error ) ,
44+ #[ error( "Could not serialize schema: {0}" ) ]
45+ SerializeSchema ( #[ from] serde_json:: Error ) ,
4446}
4547
4648impl ChromaError for SqliteMetadataError {
@@ -53,6 +55,11 @@ pub struct SqliteMetadataWriter {
5355 pub db : SqliteDb ,
5456}
5557
58+ pub struct ApplyLogsOutcome {
59+ pub schema_update : Option < Schema > ,
60+ pub max_seq_id : Option < u64 > ,
61+ }
62+
5663impl SqliteMetadataWriter {
5764 pub fn new ( db : SqliteDb ) -> Self {
5865 Self { db }
@@ -278,19 +285,64 @@ impl SqliteMetadataWriter {
278285 Ok ( self . db . get_conn ( ) . begin ( ) . await ?)
279286 }
280287
288+ pub async fn update_collection_schema (
289+ & self ,
290+ collection_id : CollectionUuid ,
291+ schema : & Schema ,
292+ ) -> Result < ( ) , SqliteMetadataError > {
293+ let schema_str = serde_json:: to_string ( schema) ?;
294+ let ( sql, values) = Query :: update ( )
295+ . table ( Collections :: Table )
296+ . value ( Collections :: SchemaStr , schema_str)
297+ . and_where (
298+ Expr :: col ( ( Collections :: Table , Collections :: Id ) ) . eq ( collection_id. to_string ( ) ) ,
299+ )
300+ . build_sqlx ( SqliteQueryBuilder ) ;
301+ sqlx:: query_with ( & sql, values)
302+ . execute ( self . db . get_conn ( ) )
303+ . await ?;
304+ Ok ( ( ) )
305+ }
306+
307+ fn ensure_schema_for_update_value (
308+ schema : & mut Option < Schema > ,
309+ key : & str ,
310+ value : & UpdateMetadataValue ,
311+ ) -> bool {
312+ match value {
313+ UpdateMetadataValue :: None => false ,
314+ _ => {
315+ if let Some ( schema_mut) = schema. as_mut ( ) {
316+ if let Ok ( metadata_value) = MetadataValue :: try_from ( value) {
317+ return schema_mut
318+ . ensure_key_from_metadata ( key, metadata_value. value_type ( ) ) ;
319+ }
320+ }
321+ false
322+ }
323+ }
324+ }
325+
281326 pub async fn apply_logs < C > (
282327 & self ,
283328 logs : Chunk < LogRecord > ,
284329 segment_id : SegmentUuid ,
330+ schema : Option < Schema > ,
285331 tx : & mut C ,
286- ) -> Result < ( ) , SqliteMetadataError >
332+ ) -> Result < ApplyLogsOutcome , SqliteMetadataError >
287333 where
288334 for < ' connection > & ' connection mut C : sqlx:: Executor < ' connection , Database = sqlx:: Sqlite > ,
289335 {
290336 if logs. is_empty ( ) {
291- return Ok ( ( ) ) ;
337+ return Ok ( ApplyLogsOutcome {
338+ schema_update : None ,
339+ max_seq_id : None ,
340+ } ) ;
292341 }
342+ let mut schema = schema;
343+ let mut schema_modified = false ;
293344 let mut max_seq_id = u64:: MIN ;
345+ let mut saw_log = false ;
294346 for (
295347 LogRecord {
296348 log_offset,
@@ -307,6 +359,7 @@ impl SqliteMetadataWriter {
307359 ) in logs. iter ( )
308360 {
309361 let log_offset_unsigned = ( * log_offset) . try_into ( ) ?;
362+ saw_log = true ;
310363 max_seq_id = max_seq_id. max ( log_offset_unsigned) ;
311364 let mut metadata_owned = metadata. clone ( ) ;
312365 if let Some ( doc) = document {
@@ -323,6 +376,11 @@ impl SqliteMetadataWriter {
323376 Self :: add_record ( tx, segment_id, log_offset_unsigned, id. clone ( ) ) . await ?
324377 {
325378 if let Some ( meta) = metadata_owned {
379+ for ( key, value) in meta. iter ( ) {
380+ if Self :: ensure_schema_for_update_value ( & mut schema, key, value) {
381+ schema_modified = true ;
382+ }
383+ }
326384 update_metadata :: < EmbeddingMetadata , _ , _ > ( tx, offset_id, meta) . await ?;
327385 }
328386
@@ -336,6 +394,11 @@ impl SqliteMetadataWriter {
336394 Self :: update_record ( tx, segment_id, log_offset_unsigned, id. clone ( ) ) . await ?
337395 {
338396 if let Some ( meta) = metadata_owned {
397+ for ( key, value) in meta. iter ( ) {
398+ if Self :: ensure_schema_for_update_value ( & mut schema, key, value) {
399+ schema_modified = true ;
400+ }
401+ }
339402 update_metadata :: < EmbeddingMetadata , _ , _ > ( tx, offset_id, meta) . await ?;
340403 }
341404
@@ -351,6 +414,11 @@ impl SqliteMetadataWriter {
351414 . await ?;
352415
353416 if let Some ( meta) = metadata_owned {
417+ for ( key, value) in meta. iter ( ) {
418+ if Self :: ensure_schema_for_update_value ( & mut schema, key, value) {
419+ schema_modified = true ;
420+ }
421+ }
354422 update_metadata :: < EmbeddingMetadata , _ , _ > ( tx, offset_id, meta) . await ?;
355423 }
356424
@@ -371,7 +439,12 @@ impl SqliteMetadataWriter {
371439
372440 Self :: upsert_max_seq_id ( tx, segment_id, max_seq_id) . await ?;
373441
374- Ok ( ( ) )
442+ let max_seq_id = if saw_log { Some ( max_seq_id) } else { None } ;
443+
444+ Ok ( ApplyLogsOutcome {
445+ schema_update : if schema_modified { schema } else { None } ,
446+ max_seq_id,
447+ } )
375448 }
376449}
377450
@@ -910,7 +983,17 @@ mod tests {
910983 ref_seg. apply_logs( test_data. logs. clone( ) , metadata_seg_id) ;
911984 let mut tx = runtime. block_on( sqlite_seg_writer. begin( ) ) . expect( "Should be able to start transaction" ) ;
912985 let data: Chunk <LogRecord > = Chunk :: new( test_data. logs. clone( ) . into( ) ) ;
913- runtime. block_on( sqlite_seg_writer. apply_logs( data, metadata_seg_id, & mut * tx) ) . expect( "Should be able to apply logs" ) ;
986+ runtime. block_on( sqlite_seg_writer. apply_logs(
987+ data,
988+ metadata_seg_id,
989+ test_data
990+ . collection_and_segments
991+ . collection
992+ . schema
993+ . clone( ) ,
994+ & mut * tx,
995+ ) )
996+ . expect( "Should be able to apply logs" ) ;
914997 runtime. block_on( tx. commit( ) ) . expect( "Should be able to commit log" ) ;
915998
916999 let sqlite_seg_reader = SqliteMetadataReader {
@@ -938,7 +1021,17 @@ mod tests {
9381021 ref_seg. apply_logs( test_data. logs. clone( ) , metadata_seg_id) ;
9391022 let mut tx = runtime. block_on( sqlite_seg_writer. begin( ) ) . expect( "Should be able to start transaction" ) ;
9401023 let data: Chunk <LogRecord > = Chunk :: new( test_data. logs. clone( ) . into( ) ) ;
941- runtime. block_on( sqlite_seg_writer. apply_logs( data, metadata_seg_id, & mut * tx) ) . expect( "Should be able to apply logs" ) ;
1024+ runtime. block_on( sqlite_seg_writer. apply_logs(
1025+ data,
1026+ metadata_seg_id,
1027+ test_data
1028+ . collection_and_segments
1029+ . collection
1030+ . schema
1031+ . clone( ) ,
1032+ & mut * tx,
1033+ ) )
1034+ . expect( "Should be able to apply logs" ) ;
9421035 runtime. block_on( tx. commit( ) ) . expect( "Should be able to commit log" ) ;
9431036
9441037 let sqlite_seg_reader = SqliteMetadataReader {
@@ -1020,7 +1113,12 @@ mod tests {
10201113 . expect ( "Should be able to start transaction" ) ;
10211114 let data: Chunk < LogRecord > = Chunk :: new ( logs. into ( ) ) ;
10221115 sqlite_seg_writer
1023- . apply_logs ( data, metadata_seg_id, & mut * tx)
1116+ . apply_logs (
1117+ data,
1118+ metadata_seg_id,
1119+ collection_and_segments. collection . schema . clone ( ) ,
1120+ & mut * tx,
1121+ )
10241122 . await
10251123 . expect ( "Should be able to apply logs" ) ;
10261124 tx. commit ( ) . await . expect ( "Should be able to commit log" ) ;
@@ -1140,7 +1238,12 @@ mod tests {
11401238 . expect ( "Should be able to start transaction" ) ;
11411239 let data: Chunk < LogRecord > = Chunk :: new ( logs. into ( ) ) ;
11421240 sqlite_seg_writer
1143- . apply_logs ( data, metadata_seg_id, & mut * tx)
1241+ . apply_logs (
1242+ data,
1243+ metadata_seg_id,
1244+ collection_and_segments. collection . schema . clone ( ) ,
1245+ & mut * tx,
1246+ )
11441247 . await
11451248 . expect ( "Should be able to apply logs" ) ;
11461249 tx. commit ( ) . await . expect ( "Should be able to commit log" ) ;
0 commit comments