Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions changelog/fragments/1710256335-add-unhealthy-reason.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Calculate unhealthy reason (input/output/other) in agent document

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3338

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
118 changes: 82 additions & 36 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ var (
)

const (
kEncodingGzip = "gzip"
kEncodingGzip = "gzip"
FailedStatus = "FAILED"
DegradedStatus = "DEGRADED"
)

// validActionTypes is a map of action.type and if they are valid
Expand Down Expand Up @@ -156,11 +158,12 @@ func invalidateAPIKeysOfInactiveAgent(ctx context.Context, zlog zerolog.Logger,

// validatedCheckin is a struct to wrap all the things that validateRequest returns.
type validatedCheckin struct {
req *CheckinRequest
dur time.Duration
rawMeta []byte
rawComp []byte
seqno sqn.SeqNo
req *CheckinRequest
dur time.Duration
rawMeta []byte
rawComp []byte
seqno sqn.SeqNo
unhealthyReason *[]string
}

func (ct *CheckinT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, start time.Time, agent *model.Agent) (validatedCheckin, error) {
Expand Down Expand Up @@ -228,7 +231,7 @@ func (ct *CheckinT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter,
}

// Compare agent_components content and update if different
rawComponents, err := parseComponents(zlog, agent, &req)
rawComponents, unhealthyReason, err := parseComponents(zlog, agent, &req)
if err != nil {
return val, err
}
Expand All @@ -240,11 +243,12 @@ func (ct *CheckinT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter,
}

return validatedCheckin{
req: &req,
dur: pollDuration,
rawMeta: rawMeta,
rawComp: rawComponents,
seqno: seqno,
req: &req,
dur: pollDuration,
rawMeta: rawMeta,
rawComp: rawComponents,
seqno: seqno,
unhealthyReason: unhealthyReason,
}, nil
}

Expand All @@ -258,6 +262,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
rawMeta := validated.rawMeta
rawComponents := validated.rawComp
seqno := validated.seqno
unhealthyReason := validated.unhealthyReason

// Handle upgrade details for agents using the new 8.11 upgrade details field of the checkin.
// Older agents will communicate any issues with upgrades via the Ack endpoint.
Expand Down Expand Up @@ -302,7 +307,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
defer longPoll.Stop()

// Initial update on checkin, and any user fields that might have changed
err = ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, rawMeta, rawComponents, seqno, ver)
err = ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, rawMeta, rawComponents, seqno, ver, unhealthyReason)
if err != nil {
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -356,7 +361,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
zlog.Trace().Msg("fire long poll")
break LOOP
case <-tick.C:
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver)
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason)
if err != nil {
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -917,50 +922,52 @@ func parseMeta(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]
return outMeta, nil
}

func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]byte, error) {
func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]byte, *[]string, error) {
var unhealthyReason []string

// fallback to other if components don't exist
if agent.UnhealthyReason == nil && (agent.LastCheckinStatus == FailedStatus || agent.LastCheckinStatus == DegradedStatus) {
unhealthyReason = []string{"other"}
} else {
unhealthyReason = agent.UnhealthyReason
}

if req.Components == nil {
return nil, nil
return nil, &unhealthyReason, nil
}

agentComponentsJSON, err := json.Marshal(agent.Components)
if err != nil {
return nil, &unhealthyReason, fmt.Errorf("agent.Components marshal: %w", err)
}

// Quick comparison first; compare the JSON payloads.
// If the data is not consistently normalized, this short-circuit will not work.
if bytes.Equal(*req.Components, agent.Components) {
if bytes.Equal(*req.Components, agentComponentsJSON) {
zlog.Trace().Msg("quick comparing agent components data is equal")
return nil, nil
return nil, &unhealthyReason, nil
}

// Deserialize the request components data
var reqComponents interface{}
var reqComponents []model.ComponentsItems
if len(*req.Components) > 0 {
if err := json.Unmarshal(*req.Components, &reqComponents); err != nil {
return nil, fmt.Errorf("parseComponents request: %w", err)
}
// Validate that components is an array
if _, ok := reqComponents.([]interface{}); !ok {
return nil, errors.New("parseComponets request: components property is not array")
return nil, &unhealthyReason, fmt.Errorf("parseComponents request: %w", err)
}
}

// If empty, don't step on existing data
if reqComponents == nil {
return nil, nil
}

// Deserialize the agent's components copy
var agentComponents interface{}
if len(agent.Components) > 0 {
if err := json.Unmarshal(agent.Components, &agentComponents); err != nil {
return nil, fmt.Errorf("parseComponents local: %w", err)
}
return nil, &unhealthyReason, nil
}

var outComponents []byte

// Compare the deserialized meta structures and return the bytes to update if different
if !reflect.DeepEqual(reqComponents, agentComponents) {
if !reflect.DeepEqual(reqComponents, agent.Components) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have already a test that test we do not update if not different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not, I'll take a look

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests to verify when components equals / not equals


zlog.Trace().
RawJSON("oldComponents", agent.Components).
RawJSON("oldComponents", agentComponentsJSON).
RawJSON("newComponents", *req.Components).
Msg("local components data is not equal")

Expand All @@ -969,9 +976,48 @@ func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinReques
Msg("applying new components data")

outComponents = *req.Components
compUnhealthyReason := calcUnhealthyReason(reqComponents)
if len(compUnhealthyReason) > 0 {
unhealthyReason = compUnhealthyReason
}
}

zlog.Debug().Any("unhealthy_reason", unhealthyReason).Msg("unhealthy reason")

return outComponents, &unhealthyReason, nil
}

func calcUnhealthyReason(reqComponents []model.ComponentsItems) []string {
var unhealthyReason []string
hasUnhealthyInput := false
hasUnhealthyOutput := false
hasUnhealthyComponent := false
for _, component := range reqComponents {
if component.Status == FailedStatus || component.Status == DegradedStatus {
hasUnhealthyComponent = true
for _, unit := range component.Units {
if unit.Status == FailedStatus || unit.Status == DegradedStatus {
if unit.Type == "input" {
hasUnhealthyInput = true
} else if unit.Type == "output" {
hasUnhealthyOutput = true
}
}
}
}
}
unhealthyReason = make([]string, 0)
if hasUnhealthyInput {
unhealthyReason = append(unhealthyReason, "input")
}
if hasUnhealthyOutput {
unhealthyReason = append(unhealthyReason, "output")
}
if !hasUnhealthyInput && !hasUnhealthyOutput && hasUnhealthyComponent {
unhealthyReason = append(unhealthyReason, "other")
}

return outComponents, nil
return unhealthyReason
}

func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDuration time.Duration) (time.Duration, time.Duration) {
Expand Down
Loading