Skip to content

Commit

Permalink
Add poll_timeout attribute to checkin endpoint (#2491)
Browse files Browse the repository at this point in the history
* Add poll timeout to checkin requests

Add poll_timeout attribute to checkin request bodies that allow the
client to inform fleet-server of when the request will time out so that
fleet-server can alter the request poll duration and write deadline to
respond before then.
If the attribute is not filled, or has a zero value such as "0s" the
configured timeouts are used instead.

* Use errors.Join in instrumentation setup

* Fix linter

* fix test

* Apply suggestions from code review

Co-authored-by: Jaime Soriano Pastor <jaime.soriano@elastic.co>

* Update dependencies (#2483)

* Update dependencies

Update all dependencied excluding:
- github.com/elastic/elastic-agent-client/v7
- github.com/gofrs/uuid
As they would require code changes.

* Update elastic-agent-system-metrics to 0.6.1 to fix scanner

* Add timeout.checkin_max_poll config

* Change CheckinMaxPoll default to 1h

---------

Co-authored-by: Jaime Soriano Pastor <jaime.soriano@elastic.co>
  • Loading branch information
michel-laterman and jsoriano authored Apr 13, 2023
1 parent 7c28955 commit 2d8dcb3
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Add poll_timeout to checkin requests

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
description: |
Add a poll_duration attribute to checkin requests that the client can use to
inform fleet-server of how long the client will hold the polling connection
open for. If specified fleet-server will return a checkin response within
the specified duration.
# Affected component; a word indicating the component this changeset affects.
component: checkin

# PR number; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 2491

# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 2337
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/elastic/fleet-server/v7

go 1.19
go 1.20

require (
github.com/Pallinder/go-randomdata v1.2.0
Expand Down
55 changes: 39 additions & 16 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,26 +111,54 @@ func (ct *CheckinT) handleCheckin(zlog zerolog.Logger, w http.ResponseWriter, r
}

func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, start time.Time, agent *model.Agent, ver string) error {

ctx := r.Context()

body := r.Body

// Limit the size of the body to prevent malicious agent from exhausting RAM in server
if ct.cfg.Limits.CheckinLimit.MaxBody > 0 {
body = http.MaxBytesReader(w, body, ct.cfg.Limits.CheckinLimit.MaxBody)
}

readCounter := datacounter.NewReaderCounter(body)

var req CheckinRequest
decoder := json.NewDecoder(readCounter)
if err := decoder.Decode(&req); err != nil {
return fmt.Errorf("decode checkin request: %w", err)
}

cntCheckin.bodyIn.Add(readCounter.Count())

var pDur time.Duration
var err error
if req.PollTimeout != nil {
pDur, err = time.ParseDuration(*req.PollTimeout)
if err != nil {
return fmt.Errorf("poll_timeout cannot be parsed as duration: %w", err)
}
}

pollDuration := ct.cfg.Timeouts.CheckinLongPoll
// set the pollDuration if pDur parsed from poll_timeout was a non-zero value
// sets timeout is set to max(1m, min(pDur-2m, max poll time))
// sets the response write timeout to max(2m, timeout+1m)
if pDur != time.Duration(0) {
pollDuration = pDur - (2 * time.Minute)
if pollDuration > ct.cfg.Timeouts.CheckinMaxPoll {
pollDuration = ct.cfg.Timeouts.CheckinMaxPoll
}
if pollDuration < time.Minute {
pollDuration = time.Minute
}

wTime := pollDuration + time.Minute
rc := http.NewResponseController(w) //nolint:bodyclose // we are working with a ResponseWriter not a Respons
if err := rc.SetWriteDeadline(start.Add(wTime)); err != nil {
zlog.Warn().Err(err).Time("write_deadline", start.Add(wTime)).Msg("Unable to set checkin write deadline.")
} else {
zlog.Trace().Time("write_deadline", start.Add(wTime)).Msg("Request write deadline set.")
}
}
zlog.Trace().Dur("pollDuration", pollDuration).Msg("Request poll duration set.")

// Compare local_metadata content and update if different
rawMeta, err := parseMeta(zlog, agent, &req)
if err != nil {
Expand Down Expand Up @@ -171,7 +199,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
defer tick.Stop()

setupDuration := time.Since(start)
pollDuration, jitter := calcPollDuration(zlog, ct.cfg, setupDuration)
pollDuration, jitter := calcPollDuration(zlog, pollDuration, setupDuration, ct.cfg.Timeouts.CheckinJitter)

zlog.Debug().
Str("status", string(req.Status)).
Expand Down Expand Up @@ -629,22 +657,17 @@ func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinReques
return outComponents, nil
}

func calcPollDuration(zlog zerolog.Logger, cfg *config.Server, setupDuration time.Duration) (time.Duration, time.Duration) {

pollDuration := cfg.Timeouts.CheckinLongPoll

func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDuration time.Duration) (time.Duration, time.Duration) {
// Under heavy load, elastic may take along time to authorize the api key, many seconds to minutes.
// Short circuit the long poll to take the setup delay into account. This is particularly necessary
// in cloud where the proxy will time us out after 10m20s causing unnecessary errors.

if setupDuration >= pollDuration {
// We took so long to setup that we need to exit immediately
pollDuration = time.Millisecond
zlog.Warn().
Dur("setupDuration", setupDuration).
Dur("pollDuration", cfg.Timeouts.CheckinLongPoll).
Dur("pollDuration", pollDuration).
Msg("excessive setup duration short cicuit long poll")

// We took so long to setup that we need to exit immediately
return time.Millisecond, time.Duration(0)
} else {
pollDuration -= setupDuration
if setupDuration > (time.Second * 10) {
Expand All @@ -656,8 +679,8 @@ func calcPollDuration(zlog zerolog.Logger, cfg *config.Server, setupDuration tim
}

var jitter time.Duration
if cfg.Timeouts.CheckinJitter != 0 {
jitter = time.Duration(rand.Int63n(int64(cfg.Timeouts.CheckinJitter))) //nolint:gosec // jitter time does not need to by generated from a crypto secure source
if jitterDuration != 0 {
jitter = time.Duration(rand.Int63n(int64(jitterDuration))) //nolint:gosec // jitter time does not need to by generated from a crypto secure source
if jitter < pollDuration {
pollDuration = pollDuration - jitter
zlog.Trace().Dur("poll", pollDuration).Msg("Long poll with jitter")
Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/api/openapi.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func TestConfig(t *testing.T) {
CheckinTimestamp: 30 * time.Second,
CheckinLongPoll: 5 * time.Minute,
CheckinJitter: 30 * time.Second,
CheckinMaxPoll: 10 * time.Minute,
},
Profiler: ServerProfiler{
Enabled: false,
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/config/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"net/url"
"os"
Expand Down Expand Up @@ -81,9 +82,9 @@ func (c *Instrumentation) APMHTTPTransportOptions() (apmtransport.HTTPTransportO

if c.TLS.ServerCA != "" {
pool, errs := tlscommon.LoadCertificateAuthorities([]string{c.TLS.ServerCA})
// FIXME once we update to go 1.20 we can return multiple wrapped errors
// FIXME once we update elastic-agent-libs to go 1.20 we can return multiple errors directly with errors.Join()
if len(errs) != 0 {
return apmtransport.HTTPTransportOptions{}, fmt.Errorf("unable to load instrumentation cas: %v", errs)
return apmtransport.HTTPTransportOptions{}, fmt.Errorf("unable to load instrumentation cas: %w", errors.Join(errs...))
}
tlsConfig.RootCAs = pool
}
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/config/testdata/input-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ inputs:
timeouts:
read: 20s
write: 5s
checkin_max_poll: 10m
6 changes: 6 additions & 0 deletions internal/pkg/config/timeouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ServerTimeouts struct {
CheckinTimestamp time.Duration `config:"checkin_timestamp"`
CheckinLongPoll time.Duration `config:"checkin_long_poll"`
CheckinJitter time.Duration `config:"checkin_jitter"`
CheckinMaxPoll time.Duration `config:"checkin_max_poll"`
}

// InitDefaults initializes the defaults for the configuration.
Expand Down Expand Up @@ -58,4 +59,9 @@ func (c *ServerTimeouts) InitDefaults() {

// Jitter subtracted from c.CheckinLongPoll. Disabled if zero.
c.CheckinJitter = 30 * time.Second

// MaxPoll is the maximum allowed value for a long poll when the client specified poll_timeout value is used.
// The long poll value is poll_timeout-2m, and the request's write timeout is set to poll_timeout-1m
// CheckinMaxPoll values of less then 1m are effectively ignored and a 1m limit is used.
c.CheckinMaxPoll = time.Hour
}
5 changes: 5 additions & 0 deletions internal/pkg/logger/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func (rc *ResponseCounter) WriteHeader(statusCode int) {
}
}

// Unwrap unwraps the underlying ResponseWriter
func (rc *ResponseCounter) Unwrap() http.ResponseWriter {
return rc.ResponseWriter
}

func (rc *ResponseCounter) Count() uint64 {
return atomic.LoadUint64(&rc.count)
}
Expand Down
128 changes: 128 additions & 0 deletions internal/pkg/server/fleet_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func startTestServer(t *testing.T, ctx context.Context) (*tserver, error) {

srvcfg := &config.Server{}
srvcfg.InitDefaults()
srvcfg.Timeouts.CheckinMaxPoll = 2 * time.Minute // set to a short value for tests
srvcfg.Host = "localhost"
srvcfg.Port = port
cfg.Inputs[0].Server = *srvcfg
Expand Down Expand Up @@ -734,3 +735,130 @@ func Test_Agent_request_errors(t *testing.T) {
require.Equal(t, http.StatusBadRequest, res.StatusCode)
})
}

func Test_SmokeTest_CheckinPollTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start test server
srv, err := startTestServer(t, ctx)
require.NoError(t, err)

cli := cleanhttp.DefaultClient()

// enroll an agent
t.Log("Enroll an agent")
req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/enroll", strings.NewReader(enrollBody))
require.NoError(t, err)
req.Header.Set("Authorization", "ApiKey "+srv.enrollKey)
req.Header.Set("User-Agent", "elastic agent "+serverVersion)
req.Header.Set("Content-Type", "application/json")
res, err := cli.Do(req)
require.NoError(t, err)

require.Equal(t, http.StatusOK, res.StatusCode)
dec := json.NewDecoder(res.Body)
var enrollResponse api.EnrollResponse
err = dec.Decode(&enrollResponse)
res.Body.Close()
require.NoError(t, err)
agentID := enrollResponse.Item.Id
apiKey := enrollResponse.Item.AccessApiKey

// checkin
t.Logf("checkin 1: agent %s no poll_timeout", agentID)
req, err = http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+agentID+"/checkin", strings.NewReader(checkinBody))
require.NoError(t, err)
req.Header.Set("Authorization", "ApiKey "+apiKey)
req.Header.Set("User-Agent", "elastic agent "+serverVersion)
req.Header.Set("Content-Type", "application/json")
start := time.Now()
res, err = cli.Do(req)
require.NoError(t, err)
t.Logf("checkin 1: agent %s took %s", agentID, time.Since(start))

require.Equal(t, http.StatusOK, res.StatusCode)
var checkinResponse api.CheckinResponse
dec = json.NewDecoder(res.Body)
err = dec.Decode(&checkinResponse)
res.Body.Close()
require.NoError(t, err)

t.Logf("Ack actions for agent %s", agentID)
events := make([]api.Event, 0, len(*checkinResponse.Actions))
for _, action := range *checkinResponse.Actions {
events = append(events, api.Event{
ActionId: action.Id,
AgentId: agentID,
Message: "test-message",
Type: api.ACTIONRESULT,
Subtype: api.ACKNOWLEDGED,
})
}
p, err := json.Marshal(api.AckRequest{Events: events})
require.NoError(t, err)
req, err = http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+agentID+"/acks", bytes.NewBuffer(p))
require.NoError(t, err)
req.Header.Set("Authorization", "ApiKey "+apiKey)
req.Header.Set("User-Agent", "elastic agent "+serverVersion)
req.Header.Set("Content-Type", "application/json")
res, err = cli.Do(req)
require.NoError(t, err)
res.Body.Close()
require.Equal(t, http.StatusOK, res.StatusCode)

t.Logf("checkin 2: agent %s poll_timeout 3m", agentID)
ctx, cancel = context.WithTimeout(ctx, 3*time.Minute)
defer cancel()
req, err = http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+agentID+"/checkin", strings.NewReader(fmt.Sprintf(`{
"ack_token": "%s",
"status": "online",
"message": "",
"poll_timeout": "3m"
}`, *checkinResponse.AckToken)))
require.NoError(t, err)
req.Header.Set("Authorization", "ApiKey "+apiKey)
req.Header.Set("User-Agent", "elastic agent "+serverVersion)
req.Header.Set("Content-Type", "application/json")
start = time.Now()
res, err = cli.Do(req)
require.NoError(t, err)
dur := time.Since(start)
t.Logf("checkin 2: agent %s took %s", agentID, time.Since(start))
p, err = io.ReadAll(res.Body)
res.Body.Close()
require.NoError(t, err)
t.Logf("Response body: %s", string(p))
t.Logf("Request duration: %s", dur)
require.Equal(t, http.StatusOK, res.StatusCode)
require.LessOrEqual(t, dur, 2*time.Minute)
err = json.Unmarshal(p, &checkinResponse)
require.NoError(t, err)

t.Logf("checkin 3: agent %s poll_timeout 10m checkin_max_limit returns early", agentID)
ctx, cancel = context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
req, err = http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+agentID+"/checkin", strings.NewReader(fmt.Sprintf(`{
"ack_token": "%s",
"status": "online",
"message": "",
"poll_timeout": "10m"
}`, *checkinResponse.AckToken)))
require.NoError(t, err)
req.Header.Set("Authorization", "ApiKey "+apiKey)
req.Header.Set("User-Agent", "elastic agent "+serverVersion)
req.Header.Set("Content-Type", "application/json")
start = time.Now()
res, err = cli.Do(req)
require.NoError(t, err)
dur = time.Since(start)
t.Logf("checkin 3: agent %s took %s", agentID, time.Since(start))
p, err = io.ReadAll(res.Body)
res.Body.Close()
require.NoError(t, err)
t.Logf("Response body: %s", string(p))
t.Logf("Request duration: %s", dur)
require.Equal(t, http.StatusOK, res.StatusCode)
require.LessOrEqual(t, dur, 3*time.Minute) // include write timeout
require.GreaterOrEqual(t, dur, time.Minute)
}
14 changes: 10 additions & 4 deletions model/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ components:
type: string
format: application/json
x-go-type: json.RawMessage
poll_timeout:
description: |
An optional timeout value that informs fleet-server of when a client will time out on it's checkin request.
If not specified fleet-server will use the timeout values specified in the config (defaults to 5m polling and a 10m write timeout).
If specified fleet-server will set its poll timeout to `max(1m, poll_timeout-2m)` and its write timeout to `max(2m, poll_timout-1m)`.
type: string
format: duration
actionSignature:
description: Optional action signing data.
type: object
Expand Down Expand Up @@ -882,12 +889,11 @@ paths:
description: |
The agent checkin endpoint.
Clients will long-poll this endpoint.
A client may inform fleet-server of it's long-poll timeout in the request body.
The fleet-server will return a response if there is a new action for the agent, or if the polling timeout is reached.
The fleet-server may also use some jitter to offset the polling duration, if a jitter value is defined the timeout value is the duration's max possible value.
The fleet-server may also use some jitter to offset the polling timeout, if specified a random amount of the jitter value may be subtracted from the polling timeout.
The fleet-sever polling timeout is short-circuited in cases of heavy load where setting up the checkin (ensuring the API key is authed etc) takes longer then the timeout value.
The fleet-server sets the timeout to 5m.
The elastic-agent sets its connection timeout to 10m.
The cloud fleet-proxy timeout is set to 10m20s.
Fleet-server sets the poll timeout to 5m by default (with a 10m write timeout), for these values we assume that elastic-agent's request timeout is set to 10m and the cloud-proxy's timeout is longer than 10m.
parameters:
- name: id
in: path
Expand Down

0 comments on commit 2d8dcb3

Please sign in to comment.