-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH] recognize and flush new metadata keys to schema on local compaction #5728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
b139cb0 to
6162b25
Compare
9ff8b57 to
f2c2971
Compare
f2c2971 to
edc07ff
Compare
6162b25 to
25cc1e2
Compare
edc07ff to
c2e5338
Compare
25cc1e2 to
5f6b983
Compare
c2e5338 to
67ee627
Compare
fee4f7e to
c8d1659
Compare
5e2d880 to
a0e7c9c
Compare
9fdfe9a to
b1db218
Compare
9c47af4 to
7e9e084
Compare
3ff589e to
ac13295
Compare
7e9e084 to
387c681
Compare
ac13295 to
038041f
Compare
387c681 to
17e078b
Compare
038041f to
d2242b5
Compare
32d4b95 to
1ce0bbb
Compare
|
Dynamic Schema Discovery & Persistence During Local Compaction Adds end-to-end support for discovering previously unseen metadata keys when processing local log compaction. Key Changes• Extended Affected Areas• This summary was automatically generated by @propel-code-bot |
38172c2 to
09a866f
Compare
bee55b4 to
296af1b
Compare
| if let Some(schema_mut) = schema.as_mut() { | ||
| if let Ok(metadata_value) = MetadataValue::try_from(value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Error handling gap: MetadataValue::try_from(value) failure is silently ignored in schema discovery logic. If the conversion fails, the key won't be added to the schema even though it might be valid metadata that should be discoverable.
if let Ok(metadata_value) = MetadataValue::try_from(value) {
return schema_mut
.ensure_key_from_metadata(key, metadata_value.value_type());
} else {
// Log the conversion failure for debugging - using standard tracing pattern
tracing::warn!(key = %key, "Failed to convert metadata value during schema discovery");
}This could lead to inconsistent schema discovery where some valid metadata keys are missed due to conversion issues. The enhanced logging follows tracing best practices with structured fields.
Context for Agents
[**BestPractice**]
Error handling gap: `MetadataValue::try_from(value)` failure is silently ignored in schema discovery logic. If the conversion fails, the key won't be added to the schema even though it might be valid metadata that should be discoverable.
```rust
if let Ok(metadata_value) = MetadataValue::try_from(value) {
return schema_mut
.ensure_key_from_metadata(key, metadata_value.value_type());
} else {
// Log the conversion failure for debugging - using standard tracing pattern
tracing::warn!(key = %key, "Failed to convert metadata value during schema discovery");
}
```
This could lead to inconsistent schema discovery where some valid metadata keys are missed due to conversion issues. The enhanced logging follows tracing best practices with structured fields.
File: rust/segment/src/sqlite_metadata.rs
Line: 319296af1b to
dd3f346
Compare
09a866f to
67f3870
Compare
| runtime.block_on(sqlite_seg_writer.apply_logs( | ||
| data, | ||
| metadata_seg_id, | ||
| test_data | ||
| .collection_and_segments | ||
| .collection | ||
| .schema | ||
| .clone(), | ||
| &mut *tx, | ||
| )) | ||
| .expect("Should be able to apply logs"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The apply_logs method signature change adds a new schema parameter, but in the test cases you're passing the collection's schema from test_data. However, the production code in local_compaction_manager.rs passes either the collection's schema or None based on schema_previously_persisted. This inconsistency could lead to different behavior between tests and production.
Consider aligning the test calls with the production logic:
runtime.block_on(sqlite_seg_writer.apply_logs(
data,
metadata_seg_id,
if schema_previously_persisted {
test_data.collection_and_segments.collection.schema.clone()
} else {
None
},
&mut *tx,
))Context for Agents
[**BestPractice**]
The `apply_logs` method signature change adds a new schema parameter, but in the test cases you're passing the collection's schema from `test_data`. However, the production code in `local_compaction_manager.rs` passes either the collection's schema or None based on `schema_previously_persisted`. This inconsistency could lead to different behavior between tests and production.
Consider aligning the test calls with the production logic:
```rust
runtime.block_on(sqlite_seg_writer.apply_logs(
data,
metadata_seg_id,
if schema_previously_persisted {
test_data.collection_and_segments.collection.schema.clone()
} else {
None
},
&mut *tx,
))
```
File: rust/segment/src/sqlite_metadata.rs
Line: 999| metadata_writer | ||
| .update_collection_schema( | ||
| collection_and_segments.collection.collection_id, | ||
| &updated_schema, | ||
| ) | ||
| .await | ||
| .map_err(|_| CompactionManagerError::MetadataApplyLogsFailed)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Missing error handling for schema update: If update_collection_schema() fails (lines 228-234), the error is converted to MetadataApplyLogsFailed which loses important context about what specifically went wrong during schema persistence. This could make debugging schema-related issues very difficult.
// Consider more specific error handling
metadata_writer
.update_collection_schema(
collection_and_segments.collection.collection_id,
&updated_schema,
)
.await
.map_err(|e| CompactionManagerError::SchemaUpdateFailed(e))?;Context for Agents
[**BestPractice**]
Missing error handling for schema update: If `update_collection_schema()` fails (lines 228-234), the error is converted to `MetadataApplyLogsFailed` which loses important context about what specifically went wrong during schema persistence. This could make debugging schema-related issues very difficult.
```rust
// Consider more specific error handling
metadata_writer
.update_collection_schema(
collection_and_segments.collection.collection_id,
&updated_schema,
)
.await
.map_err(|e| CompactionManagerError::SchemaUpdateFailed(e))?;
```
File: rust/log/src/local_compaction_manager.rs
Line: 234dd3f346 to
df8c8c8
Compare
67f3870 to
a43db01
Compare
a43db01 to
2dd65c5
Compare
df8c8c8 to
998da94
Compare
| for (key, value) in meta.iter() { | ||
| if Self::ensure_schema_for_update_value(&mut schema, key, value) { | ||
| schema_modified = true; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[PerformanceOptimization]
Potential performance issue: The ensure_schema_for_update_value function is called for every key-value pair in every metadata update within the log processing loop. For large batches with many metadata fields, this could create significant overhead.
Consider batching schema updates or using a HashSet to track which keys have already been processed to avoid redundant schema checks. For better performance, pre-size the HashSet if you can estimate the number of unique keys:
// Pre-size for better performance
let mut processed_keys = HashSet::with_capacity(estimated_unique_keys);
for (key, value) in meta.iter() {
if !processed_keys.contains(key) {
if Self::ensure_schema_for_update_value(&mut schema, key, value) {
schema_modified = true;
processed_keys.insert(key.clone());
}
}
}Note: HashSet operations are O(1) average case, but the overhead of hashing and set operations may not be beneficial for very small key sets. Consider the trade-off based on typical batch sizes in your use case.
Context for Agents
[**PerformanceOptimization**]
Potential performance issue: The `ensure_schema_for_update_value` function is called for every key-value pair in every metadata update within the log processing loop. For large batches with many metadata fields, this could create significant overhead.
Consider batching schema updates or using a `HashSet` to track which keys have already been processed to avoid redundant schema checks. For better performance, pre-size the HashSet if you can estimate the number of unique keys:
```rust
// Pre-size for better performance
let mut processed_keys = HashSet::with_capacity(estimated_unique_keys);
for (key, value) in meta.iter() {
if !processed_keys.contains(key) {
if Self::ensure_schema_for_update_value(&mut schema, key, value) {
schema_modified = true;
processed_keys.insert(key.clone());
}
}
}
```
Note: HashSet operations are O(1) average case, but the overhead of hashing and set operations may not be beneficial for very small key sets. Consider the trade-off based on typical batch sizes in your use case.
File: rust/segment/src/sqlite_metadata.rs
Line: 386ed52a09 to
f9c8123
Compare
| schema: Option<Schema>, | ||
| tx: &mut C, | ||
| ) -> Result<(), SqliteMetadataError> | ||
| ) -> Result<ApplyLogsOutcome, SqliteMetadataError> | ||
| where | ||
| for<'connection> &'connection mut C: sqlx::Executor<'connection, Database = sqlx::Sqlite>, | ||
| { | ||
| if logs.is_empty() { | ||
| return Ok(()); | ||
| return Ok(ApplyLogsOutcome { | ||
| schema_update: None, | ||
| }); | ||
| } | ||
| let mut schema = schema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
The changed apply_logs method signature adds a schema parameter but doesn't handle errors when the schema is None and metadata processing tries to access it. In ensure_schema_for_update_value, if schema.as_mut() returns None, the function silently returns false, but the caller expects schema discovery to work.
Consider adding error handling or initialization:
let mut schema = schema.unwrap_or_else(|| Schema::new());This ensures schema discovery can always function properly during log flushing.
Context for Agents
[**CriticalError**]
The changed `apply_logs` method signature adds a `schema` parameter but doesn't handle errors when the schema is `None` and metadata processing tries to access it. In `ensure_schema_for_update_value`, if `schema.as_mut()` returns `None`, the function silently returns `false`, but the caller expects schema discovery to work.
Consider adding error handling or initialization:
```rust
let mut schema = schema.unwrap_or_else(|| Schema::new());
```
This ensures schema discovery can always function properly during log flushing.
File: rust/segment/src/sqlite_metadata.rs
Line: 3458f4f55e to
7826dad
Compare
| .sysdb | ||
| .get_collection_with_segments(message.collection_id) | ||
| .await?; | ||
| let schema_previously_persisted = collection_and_segments.collection.schema.is_some(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Potential Race Condition in Schema State Check: The schema_previously_persisted boolean is determined before schema reconciliation, but the schema could be modified during reconcile_schema_with_config(). This creates a potential inconsistency where:
- Schema starts as
None(schema_previously_persisted = false) reconcile_schema_with_config()creates/modifies schema- Later logic uses the stale
schema_previously_persistedvalue
This could lead to schema updates being skipped when they should be persisted.
// Current approach - schema state checked before reconciliation
let schema_previously_persisted = collection_and_segments.collection.schema.is_some();
collection_and_segments
.collection
.reconcile_schema_with_config(KnnIndex::Hnsw)?;
// Later usage may be inconsistent with actual schema state
if schema_previously_persisted { /* ... */ }Consider checking schema state after reconciliation or using a different approach to determine when schema updates should be persisted.
Context for Agents
[**CriticalError**]
**Potential Race Condition in Schema State Check**: The `schema_previously_persisted` boolean is determined before schema reconciliation, but the schema could be modified during `reconcile_schema_with_config()`. This creates a potential inconsistency where:
1. Schema starts as `None` (schema_previously_persisted = false)
2. `reconcile_schema_with_config()` creates/modifies schema
3. Later logic uses the stale `schema_previously_persisted` value
This could lead to schema updates being skipped when they should be persisted.
```rust
// Current approach - schema state checked before reconciliation
let schema_previously_persisted = collection_and_segments.collection.schema.is_some();
collection_and_segments
.collection
.reconcile_schema_with_config(KnnIndex::Hnsw)?;
// Later usage may be inconsistent with actual schema state
if schema_previously_persisted { /* ... */ }
```
Consider checking schema state after reconciliation or using a different approach to determine when schema updates should be persisted.
File: rust/log/src/local_compaction_manager.rs
Line: 143There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is intentional. if theres no schema_str found in the db, we shouldnt use&modify it in compaction
7826dad to
5bc00ac
Compare

Description of changes
Summarize the changes made by this PR.
Test plan
added e2e test support to run for distributed, local, and single node to ensure metadata discovery and related tests run
How are these changes tested?
pytestfor python,yarn testfor js,cargo testfor rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the docs section?