Skip to content

Commit

Permalink
fix unchecked stream_id (#33335)
Browse files Browse the repository at this point in the history
  • Loading branch information
fearful-symmetry authored Oct 13, 2022
1 parent 7cab218 commit c399a9d
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions x-pack/libbeat/management/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func injectStreamProcessors(expected *proto.UnitExpectedConfig, inputType string
// the AST injects input_id at the input level and not the stream level,
// for reasons I can't understand, as it just ends up shuffling it around
// to individual metricsets anyway, at least on metricbeat
if expected.GetId() != "" {
inputId := generateAddFieldsProcessor(mapstr.M{"input_id": expected.Id}, "@metadata")
if expectedID := expected.GetId(); expectedID != "" {
inputId := generateAddFieldsProcessor(mapstr.M{"input_id": expectedID}, "@metadata")
processors = append(processors, inputId)
}

Expand All @@ -191,9 +191,10 @@ func injectStreamProcessors(expected *proto.UnitExpectedConfig, inputType string
processors = append(processors, event)

// source stream
streamID := streamExpected.GetId()
sourceStream := generateAddFieldsProcessor(mapstr.M{"stream_id": streamID}, "@metadata")
processors = append(processors, sourceStream)
if streamID := streamExpected.GetId(); streamID != "" {
sourceStream := generateAddFieldsProcessor(mapstr.M{"stream_id": streamID}, "@metadata")
processors = append(processors, sourceStream)
}

// figure out if we have any existing processors
currentProcs, ok := stream["processors"]
Expand Down

0 comments on commit c399a9d

Please sign in to comment.