Skip to content

Commit c99ccd8

Browse files
Unique limiters for each API listener (#1904)
* Unique limiters for each API listener Refactor the limit.Limiter so it can wrap the separate API httprouter endpoints. Limiter.WrapX() calls take the handler and stats incrementer for metrics/error counting. api.Run() replaced with Router.Run(), which will generate an httprouter for each listener in order to be able to associate the httprouter with a unique Limiter. * Add listener address labeled logs to limiter * Review feedback * Apply suggestions from code review Co-authored-by: Anderson Queiroz <me@andersonq.me> * review feedback * fix import * Fix test Co-authored-by: Anderson Queiroz <me@andersonq.me>
1 parent 129ea1c commit c99ccd8

File tree

16 files changed

+458
-292
lines changed

16 files changed

+458
-292
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
- Fix issue were errors where being ignored written to elasticsearch. {pull}1896[1896]
1414
- Update apikey.cache_hit log field name to match convention. {pull}1900[1900]
1515
- LoadServerLimits will not overwrite specified limits when loading default/agent number specified values. {issue}1841[1841] {pull}1912[1912]
16+
- Use seperate rate limiters for internal and external API listeners. {issue}1859[1859] {pull}1904[1904]
1617

1718
==== New Features
1819

cmd/fleet/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -907,10 +907,10 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g *
907907
ack := api.NewAckT(&cfg.Inputs[0].Server, bulker, f.cache)
908908
st := api.NewStatusT(&cfg.Inputs[0].Server, bulker, f.cache)
909909

910-
router := api.NewRouter(ctx, bulker, ct, et, at, ack, st, sm, tracer, f.bi)
910+
router := api.NewRouter(&cfg.Inputs[0].Server, bulker, ct, et, at, ack, st, sm, tracer, f.bi)
911911

912912
g.Go(loggedRunFunc(ctx, "Http server", func(ctx context.Context) error {
913-
return api.Run(ctx, router, &cfg.Inputs[0].Server)
913+
return router.Run(ctx)
914914
}))
915915

916916
return err

internal/pkg/api/error.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"strings"
1313

1414
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
15-
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
1615
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
1716

1817
"github.com/pkg/errors"
@@ -43,7 +42,6 @@ type HTTPErrResp struct {
4342

4443
// NewHTTPErrResp creates an ErrResp from a go error
4544
func NewHTTPErrResp(err error) HTTPErrResp {
46-
4745
errTable := []struct {
4846
target error
4947
meta HTTPErrResp
@@ -57,24 +55,6 @@ func NewHTTPErrResp(err error) HTTPErrResp {
5755
zerolog.WarnLevel,
5856
},
5957
},
60-
{
61-
limit.ErrRateLimit,
62-
HTTPErrResp{
63-
http.StatusTooManyRequests,
64-
"RateLimit",
65-
"exceeded the rate limit",
66-
zerolog.DebugLevel,
67-
},
68-
},
69-
{
70-
limit.ErrMaxLimit,
71-
HTTPErrResp{
72-
http.StatusTooManyRequests,
73-
"MaxLimit",
74-
"exceeded the max limit",
75-
zerolog.DebugLevel,
76-
},
77-
},
7858
{
7959
ErrAPIKeyNotEnabled,
8060
HTTPErrResp{

internal/pkg/api/handleAck.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"github.com/elastic/fleet-server/v7/internal/pkg/config"
2626
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
2727
"github.com/elastic/fleet-server/v7/internal/pkg/es"
28-
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
2928
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
3029
"github.com/elastic/fleet-server/v7/internal/pkg/model"
3130
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
@@ -42,27 +41,21 @@ func (e *HTTPError) Error() string {
4241

4342
type AckT struct {
4443
cfg *config.Server
45-
limit *limit.Limiter
4644
bulk bulk.Bulk
4745
cache cache.Cache
4846
}
4947

5048
func NewAckT(cfg *config.Server, bulker bulk.Bulk, cache cache.Cache) *AckT {
51-
log.Info().
52-
Interface("limits", cfg.Limits.AckLimit).
53-
Msg("Setting config ack_limits")
54-
5549
return &AckT{
5650
cfg: cfg,
5751
bulk: bulker,
5852
cache: cache,
59-
limit: limit.NewLimiter(&cfg.Limits.AckLimit),
6053
}
6154
}
6255

63-
func (rt Router) handleAcks(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
56+
//nolint:dupl // function body calls different internal handler then handleCheckin
57+
func (rt *Router) handleAcks(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
6458
start := time.Now()
65-
6659
id := ps.ByName("id")
6760

6861
reqID := r.Header.Get(logger.HeaderRequestID)
@@ -91,12 +84,6 @@ func (rt Router) handleAcks(w http.ResponseWriter, r *http.Request, ps httproute
9184
}
9285

9386
func (ack *AckT) handleAcks(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request, id string) error {
94-
limitF, err := ack.limit.Acquire()
95-
if err != nil {
96-
return err
97-
}
98-
defer limitF()
99-
10087
agent, err := authAgent(r, &id, ack.bulk, ack.cache)
10188
if err != nil {
10289
return err
@@ -107,10 +94,6 @@ func (ack *AckT) handleAcks(zlog *zerolog.Logger, w http.ResponseWriter, r *http
10794
return ctx.Str(LogAccessAPIKeyID, agent.AccessAPIKeyID)
10895
})
10996

110-
// Metrics; serenity now.
111-
dfunc := cntAcks.IncStart()
112-
defer dfunc()
113-
11497
return ack.processRequest(*zlog, w, r, agent)
11598
}
11699

internal/pkg/api/handleArtifacts.go

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
2020
"github.com/elastic/fleet-server/v7/internal/pkg/config"
2121
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
22-
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
2322
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
2423
"github.com/elastic/fleet-server/v7/internal/pkg/model"
2524
"github.com/elastic/fleet-server/v7/internal/pkg/throttle"
@@ -46,24 +45,17 @@ type ArtifactT struct {
4645
bulker bulk.Bulk
4746
cache cache.Cache
4847
esThrottle *throttle.Throttle
49-
limit *limit.Limiter
5048
}
5149

5250
func NewArtifactT(cfg *config.Server, bulker bulk.Bulk, cache cache.Cache) *ArtifactT {
53-
log.Info().
54-
Interface("limits", cfg.Limits.ArtifactLimit).
55-
Int("maxParallel", defaultMaxParallel).
56-
Msg("Artifact install limits")
57-
5851
return &ArtifactT{
5952
bulker: bulker,
6053
cache: cache,
61-
limit: limit.NewLimiter(&cfg.Limits.ArtifactLimit),
6254
esThrottle: throttle.NewThrottle(defaultMaxParallel),
6355
}
6456
}
6557

66-
func (rt Router) handleArtifacts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
58+
func (rt *Router) handleArtifacts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
6759
start := time.Now()
6860

6961
var (
@@ -112,12 +104,6 @@ func (rt Router) handleArtifacts(w http.ResponseWriter, r *http.Request, ps http
112104
}
113105

114106
func (at ArtifactT) handleArtifacts(zlog *zerolog.Logger, r *http.Request, id, sha2 string) (io.Reader, error) {
115-
limitF, err := at.limit.Acquire()
116-
if err != nil {
117-
return nil, err
118-
}
119-
defer limitF()
120-
121107
// Authenticate the APIKey; retrieve agent record.
122108
// Note: This is going to be a bit slow even if we hit the cache on the api key.
123109
// In order to validate that the agent still has that api key, we fetch the agent record from elastic.
@@ -131,10 +117,6 @@ func (at ArtifactT) handleArtifacts(zlog *zerolog.Logger, r *http.Request, id, s
131117
return ctx.Str(LogAccessAPIKeyID, agent.AccessAPIKeyID)
132118
})
133119

134-
// Metrics; serenity now.
135-
dfunc := cntArtifacts.IncStart()
136-
defer dfunc()
137-
138120
return at.processRequest(r.Context(), *zlog, agent, id, sha2)
139121
}
140122

internal/pkg/api/handleCheckin.go

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/elastic/fleet-server/v7/internal/pkg/checkin"
2323
"github.com/elastic/fleet-server/v7/internal/pkg/config"
2424
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
25-
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
2625
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
2726
"github.com/elastic/fleet-server/v7/internal/pkg/model"
2827
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
@@ -48,7 +47,8 @@ const (
4847
kEncodingGzip = "gzip"
4948
)
5049

51-
func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
50+
//nolint:dupl // function body calls different internal hander then handleAck
51+
func (rt *Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
5252
start := time.Now()
5353

5454
id := ps.ByName("id")
@@ -65,12 +65,6 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro
6565
cntCheckin.IncError(err)
6666
resp := NewHTTPErrResp(err)
6767

68-
// Log this as warn for visibility that limit has been reached.
69-
// This allows customers to tune the configuration on detection of threshold.
70-
if errors.Is(err, limit.ErrMaxLimit) {
71-
resp.Level = zerolog.WarnLevel
72-
}
73-
7468
zlog.WithLevel(resp.Level).
7569
Err(err).
7670
Int(ECSHTTPResponseCode, resp.StatusCode).
@@ -93,7 +87,6 @@ type CheckinT struct {
9387
ad *action.Dispatcher
9488
tr *action.TokenResolver
9589
bulker bulk.Bulk
96-
limit *limit.Limiter
9790
}
9891

9992
func NewCheckinT(
@@ -107,14 +100,6 @@ func NewCheckinT(
107100
tr *action.TokenResolver,
108101
bulker bulk.Bulk,
109102
) *CheckinT {
110-
111-
log.Info().
112-
Interface("limits", cfg.Limits.CheckinLimit).
113-
Dur("long_poll_timeout", cfg.Timeouts.CheckinLongPoll).
114-
Dur("long_poll_timestamp", cfg.Timeouts.CheckinTimestamp).
115-
Dur("long_poll_jitter", cfg.Timeouts.CheckinJitter).
116-
Msg("Checkin install limits")
117-
118103
ct := &CheckinT{
119104
verCon: verCon,
120105
cfg: cfg,
@@ -124,23 +109,15 @@ func NewCheckinT(
124109
gcp: gcp,
125110
ad: ad,
126111
tr: tr,
127-
limit: limit.NewLimiter(&cfg.Limits.CheckinLimit),
128112
bulker: bulker,
129113
}
130114

131115
return ct
132116
}
133117

134118
func (ct *CheckinT) handleCheckin(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request, id string) error {
135-
136119
start := time.Now()
137120

138-
limitF, err := ct.limit.Acquire()
139-
if err != nil {
140-
return err
141-
}
142-
defer limitF()
143-
144121
agent, err := authAgent(r, &id, ct.bulker, ct.cache)
145122
if err != nil {
146123
return err
@@ -158,11 +135,6 @@ func (ct *CheckinT) handleCheckin(zlog *zerolog.Logger, w http.ResponseWriter, r
158135

159136
// Safely check if the agent version is different, return empty string otherwise
160137
newVer := agent.CheckDifferentVersion(ver)
161-
162-
// Metrics; serenity now.
163-
dfunc := cntCheckin.IncStart()
164-
defer dfunc()
165-
166138
return ct.processRequest(*zlog, w, r, start, agent, newVer)
167139
}
168140

internal/pkg/api/handleEnroll.go

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
1717
"github.com/elastic/fleet-server/v7/internal/pkg/config"
1818
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
19-
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
2019
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
2120
"github.com/elastic/fleet-server/v7/internal/pkg/model"
2221
"github.com/elastic/fleet-server/v7/internal/pkg/rollback"
@@ -49,25 +48,19 @@ type EnrollerT struct {
4948
cfg *config.Server
5049
bulker bulk.Bulk
5150
cache cache.Cache
52-
limit *limit.Limiter
5351
}
5452

5553
func NewEnrollerT(verCon version.Constraints, cfg *config.Server, bulker bulk.Bulk, c cache.Cache) (*EnrollerT, error) {
56-
log.Info().
57-
Interface("limits", cfg.Limits.EnrollLimit).
58-
Msg("Setting config enroll_limit")
59-
6054
return &EnrollerT{
6155
verCon: verCon,
6256
cfg: cfg,
63-
limit: limit.NewLimiter(&cfg.Limits.EnrollLimit),
6457
bulker: bulker,
6558
cache: c,
6659
}, nil
6760

6861
}
6962

70-
func (rt Router) handleEnroll(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
63+
func (rt *Router) handleEnroll(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
7164
start := time.Now()
7265

7366
// Work around wonky router rule
@@ -129,13 +122,6 @@ func (rt Router) handleEnroll(w http.ResponseWriter, r *http.Request, ps httprou
129122
}
130123

131124
func (et *EnrollerT) handleEnroll(rb *rollback.Rollback, zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request) (*EnrollResponse, error) {
132-
133-
limitF, err := et.limit.Acquire()
134-
if err != nil {
135-
return nil, err
136-
}
137-
defer limitF()
138-
139125
key, err := authAPIKey(r, et.bulker, et.cache)
140126
if err != nil {
141127
return nil, err
@@ -151,10 +137,6 @@ func (et *EnrollerT) handleEnroll(rb *rollback.Rollback, zlog *zerolog.Logger, w
151137
return nil, err
152138
}
153139

154-
// Metrics; serenity now.
155-
dfunc := cntEnroll.IncStart()
156-
defer dfunc()
157-
158140
return et.processRequest(rb, *zlog, w, r, key.ID, ver)
159141
}
160142

0 commit comments

Comments
 (0)