-
Notifications
You must be signed in to change notification settings - Fork 17
feature: new index stream data structure #81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request refactors the trace ID indexing workflow to improve trace timestamp accuracy and moves index creation responsibility from vtinsert to vtstorage in cluster deployments.
Key Changes:
- Index structure now stores start_time and end_time ranges instead of approximate timestamps, improving query accuracy
- Index creation is deferred by
-insert.traceMaxDuration(default 1m) to capture complete trace time ranges - In cluster mode, vtstorage handles index creation; in single-node mode, the unified process handles it
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 21 comments.
Show a summary per file
| File | Description |
|---|---|
lib/protoparser/opentelemetry/pb/internal_fields.go |
Adds new constants for start_time and end_time index fields |
apptest/vtsingle.go |
Configures test environment with 2s trace max duration |
apptest/tests/otlp_ingestion_test.go |
Adds 2s sleep to accommodate index creation delay in tests |
app/vtstorage/main.go |
Adds IsLocalStorage() method to storage interface |
app/vtselect/traces/query/query.go |
Updates query logic to use start/end times with backward compatibility for old index format |
app/vtinsert/opentelemetry/otlphttp.go |
Changes from LogMessageProcessor to TraceProcessor |
app/vtinsert/opentelemetry/otlpgrpc.go |
Changes from LogMessageProcessor to TraceProcessor |
app/vtinsert/opentelemetry/opentelemetry.go |
Removes inline index creation logic and trace ID cache |
app/vtinsert/main.go |
Starts/stops index worker lifecycle |
app/vtinsert/internalinsert/internalinsert.go |
Updates to use TraceProcessor |
app/vtinsert/insertutil/trace_processor.go |
New processor wrapping log processor with index queue management |
app/vtinsert/insertutil/index_helper.go |
New index worker that batches and flushes trace indexes after delay |
app/vtinsert/insertutil/common_params.go |
Adds IsLocalStorage() to storage interface |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| } | ||
|
|
||
| // find startTimeNano of the span in revert order, it should be right before endTimeNano field. |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in comment: "revert" should be "reverse".
| return false | ||
| } | ||
|
|
||
| // find endTimeNano of the span in revert order, it should be right before trace ID field. |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in comment: "revert" should be "reverse".
| } | ||
| } | ||
|
|
||
| // find startTimeNano of the span in revert order, it should be right before endTimeNano field. |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in comment: "revert" should be "reverse".
| // find startTimeNano of the span in revert order, it should be right before endTimeNano field. | ||
| for ; i >= 0; i-- { | ||
| if r.Fields[i].Name == otelpb.StartTimeUnixNanoField { | ||
| startTime = strings.Clone(r.Fields[i].Value) | ||
| break | ||
| } | ||
| } |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same field ordering assumption issue. The search relies on a specific field order that may not be guaranteed.
| // 2. During shutdown, main HTTP server should be closed first, and then the index worker. | ||
| // 3. `false` only return when index worker is exited. | ||
| logger.Errorf("cannot push index for a trace to the queue: %v", fields) | ||
| return |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When pushTraceToIndexQueue returns false, the function returns early without calling tsp.lmp.AddRow(timestamp, fields, streamFields). This means the span data won't be stored if index creation fails. Although the comment suggests this shouldn't happen during normal operation, it could still lead to data loss if there's an unexpected shutdown sequence. Consider logging an error and still proceeding with storing the span data, or at a minimum, increment an error metric.
| return | |
| metrics.GetOrCreateCounter("trace_index_queue_push_failed_total").Inc() |
| initGRPCServer() | ||
| } | ||
|
|
||
| insertutil.MustStartIndexWorker() |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Index worker is started unconditionally in vtinsert, but according to the PR description, in cluster mode vtinsert should only distribute spans to vtstorage, and vtstorage should handle index creation. The index worker will be idle in cluster mode (when logRowsStorage.IsLocalStorage() returns false), wasting resources. Consider starting the index worker conditionally based on whether local storage is available, or document why it needs to run in all cases.
| timestamp, _ := strconv.ParseInt(timeStr, 10, 64) | ||
| return time.Unix(timestamp/int64(time.Second), timestamp%int64(time.Second)).Add(-*traceMaxDurationWindow), | ||
| time.Unix(timestamp/int64(time.Second), timestamp%int64(time.Second)).Add(*traceMaxDurationWindow), nil | ||
| } | ||
|
|
||
| traceIDStartTime, _ := strconv.ParseInt(traceIDStartTimeStr, 10, 64) | ||
| traceIDEndTime, _ := strconv.ParseInt(traceIDEndTimeStr, 10, 64) |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error handling is silently ignored. If strconv.ParseInt fails, the function will use a zero timestamp value which could lead to incorrect query results for backward compatibility with old data format. Consider logging the error or returning it to the caller.
| timestamp, _ := strconv.ParseInt(timeStr, 10, 64) | |
| return time.Unix(timestamp/int64(time.Second), timestamp%int64(time.Second)).Add(-*traceMaxDurationWindow), | |
| time.Unix(timestamp/int64(time.Second), timestamp%int64(time.Second)).Add(*traceMaxDurationWindow), nil | |
| } | |
| traceIDStartTime, _ := strconv.ParseInt(traceIDStartTimeStr, 10, 64) | |
| traceIDEndTime, _ := strconv.ParseInt(traceIDEndTimeStr, 10, 64) | |
| timestamp, err := strconv.ParseInt(timeStr, 10, 64) | |
| if err != nil { | |
| return time.Time{}, time.Time{}, fmt.Errorf("failed to parse timeStr '%s': %w", timeStr, err) | |
| } | |
| return time.Unix(timestamp/int64(time.Second), timestamp%int64(time.Second)).Add(-*traceMaxDurationWindow), | |
| time.Unix(timestamp/int64(time.Second), timestamp%int64(time.Second)).Add(*traceMaxDurationWindow), nil | |
| } | |
| traceIDStartTime, err := strconv.ParseInt(traceIDStartTimeStr, 10, 64) | |
| if err != nil { | |
| return time.Time{}, time.Time{}, fmt.Errorf("failed to parse traceIDStartTimeStr '%s': %w", traceIDStartTimeStr, err) | |
| } | |
| traceIDEndTime, err := strconv.ParseInt(traceIDEndTimeStr, 10, 64) | |
| if err != nil { | |
| return time.Time{}, time.Time{}, fmt.Errorf("failed to parse traceIDEndTimeStr '%s': %w", traceIDEndTimeStr, err) | |
| } |
| traceIDIndexMapPrev.Clear() | ||
| traceIDIndexMapCur, traceIDIndexMapPrev = traceIDIndexMapPrev, traceIDIndexMapCur |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential race condition during map swap. While swapping traceIDIndexMapCur and traceIDIndexMapPrev at line 102, concurrent calls to pushIndexToQueue might still be accessing the old traceIDIndexMapPrev or storing new entries in traceIDIndexMapCur. This could lead to entries being stored in the "prev" map after it has been swapped and potentially cleared on the next iteration, causing data loss.
Consider adding proper synchronization or using a different data structure that supports concurrent access during rotation.
| // find endTimeNano of the span in revert order, it should be right before trace ID field. | ||
| for ; i >= 0; i-- { | ||
| if fields[i].Name == otelpb.EndTimeUnixNanoField { | ||
| endTime = strings.Clone(fields[i].Value) | ||
| break | ||
| } | ||
| } |
Copilot
AI
Nov 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states the field "should be right before trace ID field" but the search continues from the same index i after finding trace_id. If the end_time field is not immediately before trace_id, this search will skip over it. The logic seems to assume a specific field order, but this assumption is fragile. Consider searching through all fields instead of relying on field order.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Describe Your Changes
fix #79 #21
This pull request change the trace_id index creation workflow:
trace_idindex is created once a "new" trace_id reach VictoriaTraces. VictoriaTraces writes a <trace_id-span_start_time> entry as index. And in query time, usespan_start_timeas an approximate timestamp (with +- 30s) to search spans of this trace_id.-insert.traceMaxDuration(default 1m) before flushing atrace_idinto index. During this 1m, VictoriaTraces will keep updating the min(start_time) and max(end_time) of this trace_id. So the index entry will look like: <trace_id-start_time-end_time>.trace_idand the corresponding start time, end time, and lastly create this index.Checklist
The following checks are mandatory: