Skip to content

Commit

Permalink
[CONTP-549] throttle the number of parallel client initial syncs in t…
Browse files Browse the repository at this point in the history
…he tagger server (#31741)
  • Loading branch information
adel121 authored Jan 3, 2025
1 parent 4ef0159 commit ce6e680
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 30 deletions.
2 changes: 1 addition & 1 deletion cmd/cluster-agent/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func StartServer(ctx context.Context, w workloadmeta.Component, taggerComp tagge
// event size should be small enough to fit within the grpc max message size
maxEventSize := maxMessageSize / 2
pb.RegisterAgentSecureServer(grpcSrv, &serverSecure{
taggerServer: taggerserver.NewServer(taggerComp, maxEventSize),
taggerServer: taggerserver.NewServer(taggerComp, maxEventSize, cfg.GetInt("remote_tagger.max_concurrent_sync")),
})

timeout := pkgconfigsetup.Datadog().GetDuration("cluster_agent.server.idle_timeout_seconds") * time.Second
Expand Down
2 changes: 1 addition & 1 deletion comp/api/api/apiimpl/server_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (server *apiServer) startCMDServer(
pb.RegisterAgentSecureServer(s, &serverSecure{
configService: server.rcService,
configServiceMRF: server.rcServiceMRF,
taggerServer: taggerserver.NewServer(server.taggerComp, maxEventSize),
taggerServer: taggerserver.NewServer(server.taggerComp, maxEventSize, cfg.GetInt("remote_tagger.max_concurrent_sync")),
taggerComp: server.taggerComp,
// TODO(components): decide if workloadmetaServer should be componentized itself
workloadmetaServer: workloadmetaServer.NewServer(server.wmeta),
Expand Down
86 changes: 58 additions & 28 deletions comp/core/tagger/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ const (
type Server struct {
taggerComponent tagger.Component
maxEventSize int
throttler Throttler
}

// NewServer returns a new Server
func NewServer(t tagger.Component, maxEventSize int) *Server {
func NewServer(t tagger.Component, maxEventSize int, maxParallelSync int) *Server {
return &Server{
taggerComponent: t,
maxEventSize: maxEventSize,
throttler: NewSyncThrottler(uint32(maxParallelSync)),
}
}

Expand All @@ -53,6 +55,42 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu
return err
}

ticker := time.NewTicker(streamKeepAliveInterval)
defer ticker.Stop()

timeoutRefreshError := make(chan error)

go func() {
// The remote tagger client has a timeout that closes the
// connection after 10 minutes of inactivity (implemented in
// comp/core/tagger/remote/tagger.go) In order to avoid closing the
// connection and having to open it again, the server will send
// an empty message after 9 minutes of inactivity. The goal is
// only to keep the connection alive without losing the
// protection against “half” closed connections brought by the
// timeout.
for {
select {
case <-out.Context().Done():
return

case <-ticker.C:
err = grpc.DoWithTimeout(func() error {
return out.Send(&pb.StreamTagsResponse{
Events: []*pb.StreamTagsEvent{},
})
}, taggerStreamSendTimeout)

if err != nil {
log.Warnf("error sending tagger keep-alive: %s", err)
s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc()
timeoutRefreshError <- err
return
}
}
}
}()

filterBuilder := types.NewFilterBuilder()
for _, prefix := range in.GetPrefixes() {
filterBuilder = filterBuilder.Include(types.EntityIDPrefix(prefix))
Expand All @@ -62,23 +100,27 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu

streamingID := in.GetStreamingID()
if streamingID == "" {
// this is done to preserve backward compatibility
// if CLC runner is using an old version, the streaming ID would be an empty string,
// and the server needs to auto-assign a unique id
streamingID = uuid.New().String()
}

subscriptionID := fmt.Sprintf("streaming-client-%s", streamingID)

// initBurst is a flag indicating if the initial sync is still in progress or not
// true means the sync hasn't yet been finalised
// false means the streaming client has already caught up with the server
initBurst := true
log.Debugf("requesting token from server throttler for streaming id: %q", streamingID)
tk := s.throttler.RequestToken()
defer s.throttler.Release(tk)

subscription, err := s.taggerComponent.Subscribe(subscriptionID, filter)
log.Debugf("cluster tagger has just initiated subscription for %q at time %v", subscriptionID, time.Now().Unix())
if err != nil {
log.Errorf("Failed to subscribe to tagger for subscription %q", subscriptionID)
return err
}

defer subscription.Unsubscribe()

ticker := time.NewTicker(streamKeepAliveInterval)
defer ticker.Stop()

sendFunc := func(chunk []*pb.StreamTagsEvent) error {
return grpc.DoWithTimeout(func() error {
return out.Send(&pb.StreamTagsResponse{
Expand Down Expand Up @@ -114,29 +156,17 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu
return err
}

if initBurst {
initBurst = false
s.throttler.Release(tk)
log.Infof("cluster tagger has just finished initialization for subscription %q at time %v", subscriptionID, time.Now().Unix())
}

case <-out.Context().Done():
return nil

// The remote tagger client has a timeout that closes the
// connection after 10 minutes of inactivity (implemented in
// comp/core/tagger/remote/tagger.go) In order to avoid closing the
// connection and having to open it again, the server will send
// an empty message after 9 minutes of inactivity. The goal is
// only to keep the connection alive without losing the
// protection against “half” closed connections brought by the
// timeout.
case <-ticker.C:
err = grpc.DoWithTimeout(func() error {
return out.Send(&pb.StreamTagsResponse{
Events: []*pb.StreamTagsEvent{},
})
}, taggerStreamSendTimeout)

if err != nil {
log.Warnf("error sending tagger keep-alive: %s", err)
s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc()
return err
}
case err = <-timeoutRefreshError:
return err
}
}
}
Expand Down
62 changes: 62 additions & 0 deletions comp/core/tagger/server/syncthrottler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package server

import (
"sync"

"github.com/google/uuid"
)

type token string

// Throttler provides tokens with throttling logic that limits the number of active tokens at the same time
// When a component is done with a token, it should release the token by calling the Release method
type Throttler interface {
// RequestToken returns a token
RequestToken() token
// ReleaseToken returns token back to the throttler
// This method is idempotent (i.e. invoking it on the same token multiple times will have the same effect)
Release(t token)
}

// limiter implements the Throttler interface
type limiter struct {
mutex sync.RWMutex
tokensChan chan struct{}
activeRequests map[token]struct{}
}

// NewSyncThrottler creates and returns a new Throttler
func NewSyncThrottler(maxConcurrentSync uint32) Throttler {
return &limiter{
mutex: sync.RWMutex{},
tokensChan: make(chan struct{}, maxConcurrentSync),
activeRequests: make(map[token]struct{}),
}
}

// RequestToken implements Throttler#RequestToken
func (l *limiter) RequestToken() token {
tk := token(uuid.New().String())
l.tokensChan <- struct{}{}

l.mutex.Lock()
defer l.mutex.Unlock()

l.activeRequests[tk] = struct{}{}
return tk
}

// Release implements Throttler#Release
func (l *limiter) Release(t token) {
l.mutex.Lock()
defer l.mutex.Unlock()
if _, found := l.activeRequests[t]; found {
<-l.tokensChan
delete(l.activeRequests, t)
}
}
32 changes: 32 additions & 0 deletions comp/core/tagger/server/syncthrottler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package server

import (
"sync"
"testing"
"time"
)

func TestSyncThrottler(_ *testing.T) {

throtler := NewSyncThrottler(3)

var wg sync.WaitGroup

for i := 0; i < 30; i++ {
wg.Add(1)
go func() {
defer wg.Done()
t := throtler.RequestToken()
time.Sleep(200 * time.Millisecond)
throtler.Release(t)
throtler.Release(t) // Release method should be idempotent
}()
}

wg.Wait()
}
3 changes: 3 additions & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,9 @@ func InitConfig(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("clc_runner_server_readheader_timeout", 10)
config.BindEnvAndSetDefault("clc_runner_remote_tagger_enabled", false)

// Remote tagger
config.BindEnvAndSetDefault("remote_tagger.max_concurrent_sync", 3)

// Admission controller
config.BindEnvAndSetDefault("admission_controller.enabled", false)
config.BindEnvAndSetDefault("admission_controller.validation.enabled", true)
Expand Down

0 comments on commit ce6e680

Please sign in to comment.