-
-
Notifications
You must be signed in to change notification settings - Fork 158
normalise field name: change prefix from @ to _ to allow proper querying #1514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
normalise field name: change prefix from @ to _ to allow proper querying #1514
Conversation
all fields prefixed with `@` will be renamed to have `_` prefix this is to make field queryable
WalkthroughA new public utility function Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~15 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @src/event/format/json.rs:
- Around line 121-122: collect_keys currently returns raw field names (e.g.,
"@foo") which are only normalized later by rename_json_keys, causing
derive_arrow_schema to miss matches and re-infer schemas; fix by normalizing the
collected keys immediately after collect_keys and before calling
derive_arrow_schema (i.e., pass rename_json_keys(collect_keys(...)) into
derive_arrow_schema or call rename_json_keys on the keys array returned by
collect_keys), ensuring functions rename_json_keys, collect_keys, and
derive_arrow_schema are updated accordingly so schema lookup uses the normalized
names.
🧹 Nitpick comments (2)
src/event/format/mod.rs (2)
60-67: Consider optimizing to avoid allocation.The
format!macro allocates a new String. Since the PR aims to "avoid extra allocation by mutating the var," consider usingreplace_rangefor in-place mutation:⚡ More efficient in-place mutation
#[inline] pub fn normalize_field_name(name: &mut String) { - if let Some(stripped) = name.strip_prefix('@') { - *name = format!("_{}", stripped); + if name.starts_with('@') { + name.replace_range(0..1, "_"); } }
347-376: Logic is correct but consider adding clarification.The normalization flow is correct: the field name is normalized for schema creation and pattern matching (line 357), while the original field name is used for value lookup from the JSON map (line 350). This works because JSON key normalization happens later in the pipeline via
rename_json_keys.Consider adding a brief comment at line 347-349 to clarify this subtle but important distinction for future maintainers.
📝 Suggested clarifying comment
.map(|field| { + // Normalize field name for schema - JSON keys are normalized later in the pipeline let mut field_name = field.name().to_string(); normalize_field_name(&mut field_name); match (schema_version, map.get(field.name())) {
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/event/format/json.rssrc/event/format/mod.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-18T09:59:20.177Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1415
File: src/metrics/mod.rs:700-756
Timestamp: 2025-09-18T09:59:20.177Z
Learning: In src/event/mod.rs, the parsed_timestamp used in increment_events_ingested_by_date() is correctly UTC-normalized: for dynamic streams it remains Utc::now(), and for streams with time partition enabled it uses the time partition value. Both cases result in proper UTC date strings for metrics labeling, preventing double-counting issues.
Applied to files:
src/event/format/mod.rs
🧬 Code graph analysis (1)
src/event/format/json.rs (2)
src/event/format/mod.rs (1)
normalize_field_name(63-67)src/utils/arrow/mod.rs (1)
get_field(76-84)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (1)
src/event/format/json.rs (1)
294-296: LGTM!The normalization here is necessary and correct since the schema being validated against contains normalized field names (with "_" prefix instead of "@").
| // Rename JSON keys starting with '@' to '_' to match the schema | ||
| let value_arr = rename_json_keys(value_arr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cd src/event/format && wc -l json.rsRepository: parseablehq/parseable
Length of output: 76
🏁 Script executed:
cd src/event/format && head -150 json.rs | tail -50Repository: parseablehq/parseable
Length of output: 1820
🏁 Script executed:
cd src/event/format && sed -n '70,130p' json.rsRepository: parseablehq/parseable
Length of output: 2711
🏁 Script executed:
rg -n "fn collect_keys" --type rustRepository: parseablehq/parseable
Length of output: 184
🏁 Script executed:
rg -n "fn derive_arrow_schema" --type rustRepository: parseablehq/parseable
Length of output: 117
🏁 Script executed:
rg -n "fn rename_json_keys" --type rustRepository: parseablehq/parseable
Length of output: 149
🏁 Script executed:
sed -n '231,243p' src/event/format/json.rsRepository: parseablehq/parseable
Length of output: 454
🏁 Script executed:
sed -n '244,263p' src/event/format/json.rsRepository: parseablehq/parseable
Length of output: 691
🏁 Script executed:
sed -n '264,290p' src/event/format/json.rsRepository: parseablehq/parseable
Length of output: 868
🏁 Script executed:
rg -n "fn normalize_field_name" --type rust -A 5Repository: parseablehq/parseable
Length of output: 379
🏁 Script executed:
rg -n "stream_schema" src/event/format/json.rs --type rust -B 2 -A 2Repository: parseablehq/parseable
Length of output: 1112
🏁 Script executed:
rg -n "stream_schema\|schema_schema" src/event/format/json.rs --type rust -B 3 -A 3 | head -60Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
rg -n "HashMap.*Field\|stream_schema.*insert" src/event/format/json.rs --type rust -B 2 -A 2Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
rg -n "pub fn into_event\|fn load_schema" --type rust | head -20Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
sed -n '55,80p' src/event/format/json.rsRepository: parseablehq/parseable
Length of output: 1136
🏁 Script executed:
rg -n "impl.*Handler.*for.*JsonEvent" src/event/format/json.rs --type rust -B 2 -A 15Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
rg -n "HashMap<String, Arc<Field>>" --type rust | head -20Repository: parseablehq/parseable
Length of output: 1416
🏁 Script executed:
rg -n "load_schema\|get_schema\|create_stream_and_schema" --type rust -B 2 -A 5 | head -40Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
sed -n '869,890p' src/streams.rsRepository: parseablehq/parseable
Length of output: 122
🏁 Script executed:
rg -n "get_schema_raw" src/parseable --type rust -B 3 -A 10 | head -60Repository: parseablehq/parseable
Length of output: 961
🏁 Script executed:
rg -n "infer_json_schema_from_iterator\|update_field_type_in_schema" src/event/format --type rust -B 2 -A 5 | head -80Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
sed -n '82,110p' src/event/format/json.rsRepository: parseablehq/parseable
Length of output: 1443
🏁 Script executed:
rg -n "schema.insert\|fields_to_map\|update.*schema" src/handlers/http/ingest.rs --type rust -B 3 -A 8 | head -80Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
sed -n '594,620p' src/handlers/http/ingest.rsRepository: parseablehq/parseable
Length of output: 861
🏁 Script executed:
rg -n "Metadata\|serialize_schema\|schema.clone\|write_schema" src/metadata.rs --type rust -B 2 -A 5 | head -100Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
sed -n '75,115p' src/metadata.rsRepository: parseablehq/parseable
Length of output: 1421
🏁 Script executed:
rg -n "write.*metadata\|save_schema\|update_schema" src/parseable --type rust -B 3 -A 8 | head -120Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
rg -n "fn create.*stream\|LogStreamMetadata::new" src/parseable/streams.rs --type rust -B 5 -A 15 | head -100Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
rg -n "into_recordbatch\|into_event" src/event/format/json.rs --type rust -B 3 -A 20 | head -80Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
sed -n '140,180p' src/event/format/json.rsRepository: parseablehq/parseable
Length of output: 1427
🏁 Script executed:
rg -n "fn create_stream\|schema.*insert\|metadata.*write" src/handlers/http/ingest.rs --type rust -B 5 -A 15 | head -150Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
rg -n "create_stream_and_schema" src --type rust -B 5 -A 20 | head -100Repository: parseablehq/parseable
Length of output: 6112
🏁 Script executed:
rg -n "schema.*persist\|save.*field\|store.*schema" src/parseable --type rust -B 5 -A 10 | head -80Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
rg -n "create_schema_from_metastore\|persist_schema\|set_schema" src/parseable --type rust -B 3 -A 8Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
rg -n "normalize_field_name" src --type rust -B 5 -A 5Repository: parseablehq/parseable
Length of output: 3561
🏁 Script executed:
sed -n '1,60p' src/event/format/json.rsRepository: parseablehq/parseable
Length of output: 1954
Normalize collected keys before schema lookup to avoid unnecessary schema inference.
For existing streams, the stored schema uses normalized field names (e.g., _foo instead of @foo). However, collect_keys extracts field names as-is from the incoming JSON. When derive_arrow_schema attempts to match these non-normalized keys against the stored schema, the lookup fails and triggers a full schema inference path.
This causes unnecessary overhead on every event containing "@" prefixed fields, since:
collect_keysextracts "@foo"derive_arrow_schematries to find "@foo" in schema (which has "_foo")- Lookup fails, schema inference is triggered
- Keys are only normalized later at line 121 via
rename_json_keys
Normalize the collected keys before passing them to derive_arrow_schema to avoid repeated schema inference for existing streams.
🤖 Prompt for AI Agents
In @src/event/format/json.rs around lines 121 - 122, collect_keys currently
returns raw field names (e.g., "@foo") which are only normalized later by
rename_json_keys, causing derive_arrow_schema to miss matches and re-infer
schemas; fix by normalizing the collected keys immediately after collect_keys
and before calling derive_arrow_schema (i.e., pass
rename_json_keys(collect_keys(...)) into derive_arrow_schema or call
rename_json_keys on the keys array returned by collect_keys), ensuring functions
rename_json_keys, collect_keys, and derive_arrow_schema are updated accordingly
so schema lookup uses the normalized names.
| /// Renames JSON keys to match the schema transformation using normalize_field_name | ||
| fn rename_json_keys(values: Vec<Value>) -> Vec<Value> { | ||
| values | ||
| .into_iter() | ||
| .map(|value| { | ||
| if let Value::Object(map) = value { | ||
| let new_map: serde_json::Map<String, Value> = map | ||
| .into_iter() | ||
| .map(|(mut key, val)| { | ||
| super::normalize_field_name(&mut key); | ||
| (key, val) | ||
| }) | ||
| .collect(); | ||
| Value::Object(new_map) | ||
| } else { | ||
| value | ||
| } | ||
| }) | ||
| .collect() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation is correct, but consider key collision edge case.
The function correctly normalizes JSON object keys. However, if an input object contains both "@foo" and "_foo", they will collide after normalization, potentially causing data loss (the second insertion would overwrite the first).
While this edge case may be rare, consider whether validation or error handling is needed:
🛡️ Potential collision detection
fn rename_json_keys(values: Vec<Value>) -> Vec<Value> {
values
.into_iter()
.map(|value| {
if let Value::Object(map) = value {
+ let mut seen_keys = std::collections::HashSet::new();
let new_map: serde_json::Map<String, Value> = map
.into_iter()
.map(|(mut key, val)| {
+ let original_key = key.clone();
super::normalize_field_name(&mut key);
+ if !seen_keys.insert(key.clone()) {
+ tracing::warn!("Key collision detected: '{}' normalizes to existing key '{}'", original_key, key);
+ }
(key, val)
})
.collect();
Value::Object(new_map)
} else {
value
}
})
.collect()
}
all fields prefixed with
@will be renamed to have_prefixthis is to make field queryable
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.