Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add agent_policy_id and policy_revision_idx to checkin requests

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Add the agent_policy_id and policy_revision_idx attributes to checkin
request bodies so an agent is able to inform fleet-server of its exact
policy. These details will replace the need for an ack on
policy_change actions, and will be used to determine when to send a
policy change when there is a new revision available, or when the
agent is reassigned to a different policy. Add a server setting under
feature_flags.ignore_checkin_policy_id that disables this behavour and
restores the previous approach.

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: fleet-server

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/fleet-server/pull/5501

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/6446
7 changes: 7 additions & 0 deletions fleet-server.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ fleet:
# upstream_url: "https://artifacts.elastic.co/GPG-KEY-elastic-agent"
# # By default dir is the directory containing the fleet-server executable (following symlinks) joined with elastic-agent-upgrade-keys
# dir: ./elastic-agent-upgrade-keys
#
# # Toggles to enable new behaviour or restore old behaviour.
# feature_flags:
# // ignore agent_policy_id and policy_revision_idx attributes that may be present in the checkin request bodies.
# // POLICY_CHANGE actions need an explicit ack if this is set.
# ignore_checkin_policy_id: false
#
# # monitor options are advanced configuration and should not be adjusted is most cases
# monitor:
# fetch_size: 1000 # The number of documents that each monitor may fetch at once
Expand Down
23 changes: 16 additions & 7 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,19 +442,28 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
agentID string,
apiKeyID, permissionHash string,
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, outputName string) error {
bulk := ack.bulk
return updateAPIKey(ctx, zlog, ack.bulk, agentID, apiKeyID, permissionHash, toRetireAPIKeyIDs, outputName)
}

func updateAPIKey(ctx context.Context,
zlog zerolog.Logger,
bulk bulk.Bulk,
agentID string,
apiKeyID, permissionHash string,
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, outputName string) error {
// use output bulker if exists
outBulk := bulk
if outputName != "" {
outputBulk := ack.bulk.GetBulker(outputName)
outputBulk := bulk.GetBulker(outputName)
if outputBulk != nil {
zlog.Debug().Str(ecs.PolicyOutputName, outputName).Msg("Using output bulker in updateAPIKey")
bulk = outputBulk
outBulk = outputBulk
}
}
if apiKeyID != "" {
res, err := bulk.APIKeyRead(ctx, apiKeyID, true)
res, err := outBulk.APIKeyRead(ctx, apiKeyID, true)
if err != nil {
if isAgentActive(ctx, zlog, ack.bulk, agentID) {
if isAgentActive(ctx, zlog, outBulk, agentID) {
zlog.Warn().
Err(err).
Str(LogAPIKeyID, apiKeyID).
Expand All @@ -480,7 +489,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
Str(LogAPIKeyID, apiKeyID).
Msg("Failed to cleanup roles")
} else if removedRolesCount > 0 {
if err := bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
if err := outBulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Str(ecs.PolicyOutputName, outputName).Msg("Failed to update API Key")
} else {
zlog.Debug().
Expand All @@ -493,7 +502,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
}
}
}
ack.invalidateAPIKeys(ctx, zlog, toRetireAPIKeyIDs, apiKeyID)
invalidateAPIKeys(ctx, zlog, bulk, toRetireAPIKeyIDs, apiKeyID)
}

return nil
Expand Down
78 changes: 74 additions & 4 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,34 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
return fmt.Errorf("failed to update upgrade_details: %w", err)
}

initialOpts := []checkin.Option{
checkin.WithStatus(string(req.Status)),
checkin.WithMessage(req.Message),
checkin.WithMeta(rawMeta),
checkin.WithComponents(rawComponents),
checkin.WithSeqNo(seqno),
checkin.WithVer(ver),
checkin.WithUnhealthyReason(unhealthyReason),
checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""),
}

revID, opts, err := ct.processPolicyDetails(r.Context(), zlog, agent, req)
if err != nil {
return fmt.Errorf("failed to update policy details: %w", err)
}
if len(opts) > 0 {
initialOpts = append(initialOpts, opts...)
}

// Subscribe to actions dispatcher
aSub := ct.ad.Subscribe(zlog, agent.Id, seqno)
defer ct.ad.Unsubscribe(zlog, aSub)
actCh := aSub.Ch()

// use revision_idx=0 if the agent has a single output where no API key is defined
// This will force the policy monitor to emit a new policy to regerate API keys
revID := agent.PolicyRevisionIdx
for _, output := range agent.Outputs {
if output.APIKey == "" {
// use revision_idx=0 if the agent has a single output where no API key is defined
// This will force the policy monitor to emit a new policy to regerate API keys
revID = 0
break
}
Expand Down Expand Up @@ -328,7 +346,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
// Initial update on checkin, and any user fields that might have changed
// Run a script to remove audit_unenrolled_* and unenrolled_at attributes if one is set on checkin.
// 8.16.x releases would incorrectly set unenrolled_at
err = ct.bc.CheckIn(agent.Id, checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), checkin.WithMeta(rawMeta), checkin.WithComponents(rawComponents), checkin.WithSeqNo(seqno), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason), checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""))
err = ct.bc.CheckIn(agent.Id, initialOpts...)
if err != nil {
zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -1124,3 +1142,55 @@ func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDu

return pollDuration, jitter
}

// processPolicyDetails handles the agent_policy_id and revision_idx included in the checkin request.
// The API keys will be managed if the agent reports a new policy id from its last checkin, or if the revision is different than what the last checkin reported.
// It returns the revision idx that should be used when subscribing for new POLICY_CHANGE actons and optional args to use when doing the non-tick checkin.
func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) (int64, []checkin.Option, error) {
// no details specified or attributes are ignored by config
if ct.cfg.Features.IgnoreCheckinPolicyID || req == nil || req.PolicyRevisionIdx == nil || req.AgentPolicyId == nil {
return agent.PolicyRevisionIdx, nil, nil
}
policyID := *req.AgentPolicyId
revisionIDX := *req.PolicyRevisionIdx

span, ctx := apm.StartSpan(ctx, "Process policy details", "process")
span.Context.SetLabel("agent_id", agent.Agent.ID)
span.Context.SetLabel(dl.FieldAgentPolicyID, policyID)
span.Context.SetLabel(dl.FieldPolicyRevisionIdx, revisionIDX)
defer span.End()

// update agent doc if policy id or revision idx does not match
var opts []checkin.Option
if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx {
opts = []checkin.Option{
checkin.WithAgentPolicyID(policyID),
checkin.WithPolicyRevisionIDX(revisionIDX),
}
}
// Policy reassign, subscribe to policy with revision 0
if policyID != agent.PolicyID {
zlog.Debug().Str(dl.FieldAgentPolicyID, policyID).Str("new_policy_id", agent.PolicyID).Msg("Policy ID mismatch detected, reassigning agent.")
return 0, opts, nil
}

// Check if the checkin revision_idx is greater than the latest available
latestRev := ct.pm.LatestRev(ctx, agent.PolicyID)
if latestRev != 0 && revisionIDX > latestRev {
revisionIDX = 0 // set return val to 0 so the agent gets latest available revision.
}

// Update API keys if the policy has changed, or if the revision differs.
if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx {
for outputName, output := range agent.Outputs {
if output.Type != policy.OutputTypeElasticsearch {
continue
}
if err := updateAPIKey(ctx, zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName); err != nil {
// Only returns ErrUpdatingInactiveAgent
return 0, nil, err
}
}
}
return revisionIDX, opts, nil
}
Loading
Loading