Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions internal/pkg/otel/translate/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ func DropComponentStateFromOtelStatus(otelStatus *status.AggregateStatus) (*stat
newStatus := deepCopyStatus(otelStatus)
for pipelineStatusId := range newStatus.ComponentStatusMap {
pipelineId := &pipeline.ID{}
componentKind, pipelineIdStr := parseEntityStatusId(pipelineStatusId)
componentKind, pipelineIdStr, parseErr := parseEntityStatusId(pipelineStatusId)
if parseErr != nil {
return nil, parseErr
}
if componentKind == "extensions" {
continue
}
if componentKind != "pipeline" {
return nil, fmt.Errorf("pipeline status id %s is not a pipeline", pipelineStatusId)
}
Expand All @@ -91,7 +97,13 @@ func getOtelRuntimePipelineStatuses(otelStatus *status.AggregateStatus) (map[str

for pipelineStatusId, pipelineStatus := range otelStatus.ComponentStatusMap {
pipelineId := &pipeline.ID{}
componentKind, pipelineIdStr := parseEntityStatusId(pipelineStatusId)
componentKind, pipelineIdStr, parseErr := parseEntityStatusId(pipelineStatusId)
if parseErr != nil {
return nil, parseErr
}
if componentKind == "extensions" {
continue
}
if componentKind != "pipeline" {
return nil, fmt.Errorf("pipeline status id %s is not a pipeline", pipelineStatusId)
}
Expand Down Expand Up @@ -176,8 +188,11 @@ func getUnitOtelStatuses(pipelineStatus *status.AggregateStatus) (

for otelCompStatusId, otelCompStatus := range pipelineStatus.ComponentStatusMap {
var otelComponentID otelcomponent.ID
componentKind, otelComponentIDStr := parseEntityStatusId(otelCompStatusId)
err := otelComponentID.UnmarshalText([]byte(otelComponentIDStr))
componentKind, otelComponentIDStr, parseErr := parseEntityStatusId(otelCompStatusId)
if parseErr != nil {
return nil, nil, parseErr
}
err = otelComponentID.UnmarshalText([]byte(otelComponentIDStr))
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -267,13 +282,17 @@ func otelStatusToUnitState(status componentstatus.Status) client.UnitState {

// parseEntityStatusId parses an entity status ID into its kind and entity ID. An entity can be a pipeline or otel component.
// The ID is expected to be in the format "kind:entityId", where kind is either "pipeline" or the otel component type (e.g., "receiver", "exporter").
// The returned entityId may be empty - this is true for the top-level "extensions" key.
// This format is used by the healthcheckv2 extension.
func parseEntityStatusId(id string) (kind string, entityId string) {
func parseEntityStatusId(id string) (kind string, entityId string, err error) {
if id == "extensions" {
return "extensions", "", nil
}
parts := strings.SplitN(id, ":", 2)
if len(parts) != 2 {
return "", ""
return "", "", fmt.Errorf("couldn't parse otel status id: %s", id)
}
return parts[0], parts[1]
return parts[0], parts[1], nil
}

// deepCopyStatus makes a deep copy of the status.
Expand Down
47 changes: 38 additions & 9 deletions internal/pkg/otel/translate/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestGetAllComponentState(t *testing.T) {
},
},
},
expectedErr: fmt.Errorf("pipeline status id %s is not a pipeline", fmt.Sprintf("logs/%sfilestream-default", OtelNamePrefix)),
expectedErr: fmt.Errorf("couldn't parse otel status id: %s", fmt.Sprintf("logs/%sfilestream-default", OtelNamePrefix)),
},
{
name: "one otel component, one process component",
Expand Down Expand Up @@ -213,7 +213,20 @@ func TestDropComponentStateFromOtelStatus(t *testing.T) {
s, err := DropComponentStateFromOtelStatus(otelStatus)
require.Error(t, err)
require.Nil(t, s)
assert.Equal(t, "pipeline status id logs is not a pipeline", err.Error())
assert.Equal(t, "couldn't parse otel status id: logs", err.Error())
})

t.Run("ignore extensions", func(t *testing.T) {
otelStatus := &status.AggregateStatus{
ComponentStatusMap: map[string]*status.AggregateStatus{
"extensions": {
Event: componentstatus.NewEvent(componentstatus.StatusOK),
},
},
}
s, err := DropComponentStateFromOtelStatus(otelStatus)
require.NoError(t, err)
assert.Equal(t, otelStatus, s)
})
}

Expand Down Expand Up @@ -262,7 +275,19 @@ func TestGetOtelRuntimePipelineStatuses(t *testing.T) {
},
},
expected: nil,
err: "pipeline status id invalid-format is not a pipeline",
err: "couldn't parse otel status id: invalid-format",
},
{
name: "extensions are ignored",
status: &status.AggregateStatus{
Event: componentstatus.NewEvent(componentstatus.StatusOK),
ComponentStatusMap: map[string]*status.AggregateStatus{
"extensions": {
Event: componentstatus.NewEvent(componentstatus.StatusOK),
},
},
},
expected: map[string]*status.AggregateStatus{},
},
}

Expand Down Expand Up @@ -487,16 +512,20 @@ func TestParseEntityStatusId(t *testing.T) {
id string
expectedKind string
expectedEntityID string
expectedErr error
}{
{"pipeline:logs", "pipeline", "logs"},
{"pipeline:logs/filestream-monitoring", "pipeline", "logs/filestream-monitoring"},
{"receiver:filebeat/filestream-monitoring", "receiver", "filebeat/filestream-monitoring"},
{"exporter:elasticsearch/default", "exporter", "elasticsearch/default"},
{"invalid", "", ""},
{"pipeline:logs", "pipeline", "logs", nil},
{"pipeline:logs/filestream-monitoring", "pipeline", "logs/filestream-monitoring", nil},
{"receiver:filebeat/filestream-monitoring", "receiver", "filebeat/filestream-monitoring", nil},
{"exporter:elasticsearch/default", "exporter", "elasticsearch/default", nil},
{"invalid", "", "", fmt.Errorf("couldn't parse otel status id: %s", "invalid")},
{"", "", "", fmt.Errorf("couldn't parse otel status id: %s", "")},
{"extensions", "extensions", "", nil},
}

for _, test := range tests {
componentKind, pipelineId := parseEntityStatusId(test.id)
componentKind, pipelineId, err := parseEntityStatusId(test.id)
assert.Equal(t, test.expectedErr, err)
assert.Equal(t, test.expectedKind, componentKind, "component kind")
assert.Equal(t, test.expectedEntityID, pipelineId, "pipeline id")
}
Expand Down