Skip to content

Commit

Permalink
pkg/trace: refactor peer tags config and add to info endpoint (#27603)
Browse files Browse the repository at this point in the history
  • Loading branch information
knusbaum authored Aug 5, 2024
1 parent bf4e2b6 commit 68e0fd7
Show file tree
Hide file tree
Showing 21 changed files with 437 additions and 359 deletions.
1 change: 1 addition & 0 deletions comp/core/log/impl-trace/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5 // indirect
google.golang.org/grpc v1.64.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions comp/core/log/impl-trace/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 103 additions & 4 deletions comp/trace/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2432,22 +2432,54 @@ func TestSetMaxMemCPU(t *testing.T) {
})
}

func TestPeerServiceAggregation(t *testing.T) {
func TestPeerTagsAggregation(t *testing.T) {
t.Run("disabled", func(t *testing.T) {
config := fxutil.Test[Component](t, fx.Options(
corecomp.MockModule(),
MockModule(),
))
// underlying config
cfg := config.Object()

require.NotNil(t, cfg)
assert.False(t, cfg.PeerServiceAggregation)
assert.False(t, cfg.PeerTagsAggregation)
assert.Nil(t, cfg.PeerTags)
})

t.Run("deprecated-enabled", func(t *testing.T) {
overrides := map[string]interface{}{
"apm_config.peer_service_aggregation": true,
}

config := fxutil.Test[Component](t, fx.Options(
corecomp.MockModule(),
fx.Replace(corecomp.MockParams{Overrides: overrides}),
MockModule(),
))
// underlying config
cfg := config.Object()
require.NotNil(t, cfg)
assert.True(t, cfg.PeerTagsAggregation)
assert.Nil(t, cfg.PeerTags)
})
t.Run("enabled", func(t *testing.T) {
overrides := map[string]interface{}{
"apm_config.peer_tags_aggregation": true,
}

config := fxutil.Test[Component](t, fx.Options(
corecomp.MockModule(),
fx.Replace(corecomp.MockParams{Overrides: overrides}),
MockModule(),
))
// underlying config
cfg := config.Object()
assert.True(t, cfg.PeerTagsAggregation)
assert.Nil(t, cfg.PeerTags)
})
t.Run("both-enabled", func(t *testing.T) {
overrides := map[string]interface{}{
"apm_config.peer_service_aggregation": true,
"apm_config.peer_tags_aggregation": true,
}

config := fxutil.Test[Component](t, fx.Options(
Expand All @@ -2457,9 +2489,76 @@ func TestPeerServiceAggregation(t *testing.T) {
))
// underlying config
cfg := config.Object()
require.NotNil(t, cfg)
assert.True(t, cfg.PeerTagsAggregation)
assert.Nil(t, cfg.PeerTags)
})
t.Run("disabled-user-tags", func(t *testing.T) {
overrides := map[string]interface{}{
"apm_config.peer_tags": []string{"user_peer_tag"},
}

config := fxutil.Test[Component](t, fx.Options(
corecomp.MockModule(),
fx.Replace(corecomp.MockParams{Overrides: overrides}),
MockModule(),
))
// underlying config
cfg := config.Object()
require.NotNil(t, cfg)
assert.False(t, cfg.PeerTagsAggregation)
assert.Equal(t, []string{"user_peer_tag"}, cfg.PeerTags)
})
t.Run("deprecated-enabled-user-tags", func(t *testing.T) {
overrides := map[string]interface{}{
"apm_config.peer_tags": []string{"user_peer_tag"},
"apm_config.peer_service_aggregation": true,
}

config := fxutil.Test[Component](t, fx.Options(
corecomp.MockModule(),
fx.Replace(corecomp.MockParams{Overrides: overrides}),
MockModule(),
))
// underlying config
cfg := config.Object()
assert.True(t, cfg.PeerTagsAggregation)
assert.Equal(t, []string{"user_peer_tag"}, cfg.PeerTags)
})
t.Run("enabled-user-tags", func(t *testing.T) {
overrides := map[string]interface{}{
"apm_config.peer_tags": []string{"user_peer_tag"},
"apm_config.peer_tags_aggregation": true,
}

config := fxutil.Test[Component](t, fx.Options(
corecomp.MockModule(),
fx.Replace(corecomp.MockParams{Overrides: overrides}),
MockModule(),
))
// underlying config
cfg := config.Object()
require.NotNil(t, cfg)
assert.True(t, cfg.PeerTagsAggregation)
assert.Equal(t, []string{"user_peer_tag"}, cfg.PeerTags)
})
t.Run("both-enabled-user-tags", func(t *testing.T) {
overrides := map[string]interface{}{
"apm_config.peer_tags": []string{"user_peer_tag"},
"apm_config.peer_tags_aggregation": true,
"apm_config.peer_service_aggregation": true,
}

config := fxutil.Test[Component](t, fx.Options(
corecomp.MockModule(),
fx.Replace(corecomp.MockParams{Overrides: overrides}),
MockModule(),
))
// underlying config
cfg := config.Object()
require.NotNil(t, cfg)
assert.True(t, cfg.PeerServiceAggregation)
assert.True(t, cfg.PeerTagsAggregation)
assert.Equal(t, []string{"user_peer_tag"}, cfg.PeerTags)
})
}

Expand Down
9 changes: 6 additions & 3 deletions comp/trace/config/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,18 @@ func applyDatadogConfig(c *config.AgentConfig, core corecompcfg.Component) error
if core.IsSet("apm_config.connection_limit") {
c.ConnectionLimit = core.GetInt("apm_config.connection_limit")
}
c.PeerServiceAggregation = core.GetBool("apm_config.peer_service_aggregation")
if c.PeerServiceAggregation {

// NOTE: maintain backwards-compatibility with old peer service flag that will eventually be deprecated.
c.PeerTagsAggregation = core.GetBool("apm_config.peer_service_aggregation")
if c.PeerTagsAggregation {
log.Warn("`apm_config.peer_service_aggregation` is deprecated, please use `apm_config.peer_tags_aggregation` instead")
}
c.PeerTagsAggregation = core.GetBool("apm_config.peer_tags_aggregation")
c.PeerTagsAggregation = c.PeerTagsAggregation || core.GetBool("apm_config.peer_tags_aggregation")
c.ComputeStatsBySpanKind = core.GetBool("apm_config.compute_stats_by_span_kind")
if core.IsSet("apm_config.peer_tags") {
c.PeerTags = core.GetStringSlice("apm_config.peer_tags")
}

if core.IsSet("apm_config.extra_sample_rate") {
c.ExtraSampleRate = core.GetFloat64("apm_config.extra_sample_rate")
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/trace/api/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (r *HTTPReceiver) makeInfoHandler() (hash string, handler http.HandlerFunc)
LongRunningSpans bool `json:"long_running_spans"`
EvpProxyAllowedHeaders []string `json:"evp_proxy_allowed_headers"`
Config reducedConfig `json:"config"`
PeerTags []string `json:"peer_tags"`
}{
Version: r.conf.AgentVersion,
GitCommit: r.conf.GitCommit,
Expand All @@ -95,6 +96,7 @@ func (r *HTTPReceiver) makeInfoHandler() (hash string, handler http.HandlerFunc)
AnalyzedSpansByService: r.conf.AnalyzedSpansByService,
Obfuscation: oconf,
},
PeerTags: r.conf.ConfiguredPeerTags(),
}, "", "\t")
if err != nil {
panic(fmt.Errorf("Error making /info handler: %v", err))
Expand Down
24 changes: 12 additions & 12 deletions pkg/trace/api/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,18 +244,17 @@ func TestInfoHandler(t *testing.T) {
Host: "https://target-intake.datadoghq.com",
NoProxy: true,
}},
BucketInterval: time.Second,
ExtraAggregators: []string{"agg:val"},
PeerServiceAggregation: true,
ExtraSampleRate: 2.4,
TargetTPS: 11,
MaxEPS: 12,
ReceiverHost: "localhost",
ReceiverPort: 8111,
ReceiverSocket: "/sock/path",
ConnectionLimit: 12,
ReceiverTimeout: 100,
MaxRequestBytes: 123,
BucketInterval: time.Second,
ExtraAggregators: []string{"agg:val"},
ExtraSampleRate: 2.4,
TargetTPS: 11,
MaxEPS: 12,
ReceiverHost: "localhost",
ReceiverPort: 8111,
ReceiverSocket: "/sock/path",
ConnectionLimit: 12,
ReceiverTimeout: 100,
MaxRequestBytes: 123,
StatsWriter: &config.WriterConfig{
ConnectionLimit: 20,
QueueSize: 12,
Expand Down Expand Up @@ -302,6 +301,7 @@ func TestInfoHandler(t *testing.T) {
"span_meta_structs": nil,
"long_running_spans": nil,
"evp_proxy_allowed_headers": nil,
"peer_tags": nil,
"config": map[string]interface{}{
"default_env": nil,
"target_tps": nil,
Expand Down
10 changes: 9 additions & 1 deletion pkg/trace/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ type AgentConfig struct {
// Concentrator
BucketInterval time.Duration // the size of our pre-aggregation per bucket
ExtraAggregators []string // DEPRECATED
PeerServiceAggregation bool // TO BE DEPRECATED - enables/disables stats aggregation for peer.service, used by Concentrator and ClientStatsAggregator
PeerTagsAggregation bool // enables/disables stats aggregation for peer entity tags, used by Concentrator and ClientStatsAggregator
ComputeStatsBySpanKind bool // enables/disables the computing of stats based on a span's `span.kind` field
PeerTags []string // additional tags to use for peer entity stats aggregation
Expand Down Expand Up @@ -606,6 +605,15 @@ func (c *AgentConfig) AllFeatures() []string {
return feats
}

// ConfiguredPeerTags returns the set of peer tags that should be used
// for aggregation based on the various config values and the base set of tags.
func (c *AgentConfig) ConfiguredPeerTags() []string {
if !c.PeerTagsAggregation {
return nil
}
return preparePeerTags(append(basePeerTags, c.PeerTags...))
}

func inAzureAppServices() bool {
_, existsLinux := os.LookupEnv("WEBSITE_STACK")
_, existsWin := os.LookupEnv("WEBSITE_APPSERVICEAPPLOGS_TRACE_ENABLED")
Expand Down
34 changes: 34 additions & 0 deletions pkg/trace/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,37 @@ func TestInAzureAppServices(t *testing.T) {
assert.True(t, isWindowsAzure)
assert.False(t, isNotAzure)
}

func TestPeerTagsAggregation(t *testing.T) {
t.Run("disabled", func(t *testing.T) {
cfg := New()
assert.False(t, cfg.PeerTagsAggregation)
assert.Empty(t, cfg.PeerTags)
assert.Empty(t, cfg.ConfiguredPeerTags())
})

t.Run("enabled", func(t *testing.T) {
cfg := New()
cfg.PeerTagsAggregation = true
assert.Empty(t, cfg.PeerTags)
assert.Equal(t, basePeerTags, cfg.ConfiguredPeerTags())
})
t.Run("disabled-user-tags", func(t *testing.T) {
cfg := New()
cfg.PeerTags = []string{"user_peer_tag"}
assert.False(t, cfg.PeerTagsAggregation)
assert.Empty(t, cfg.ConfiguredPeerTags())
})
t.Run("enabled-user-tags", func(t *testing.T) {
cfg := New()
cfg.PeerTagsAggregation = true
cfg.PeerTags = []string{"user_peer_tag"}
assert.Equal(t, append(basePeerTags, "user_peer_tag"), cfg.ConfiguredPeerTags())
})
t.Run("dedup", func(t *testing.T) {
cfg := New()
cfg.PeerTagsAggregation = true
cfg.PeerTags = basePeerTags[:2]
assert.Equal(t, basePeerTags, cfg.ConfiguredPeerTags())
})
}
55 changes: 55 additions & 0 deletions pkg/trace/config/peer_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package config

import (
_ "embed" //nolint:revive
"sort"
"strings"

"github.com/DataDog/datadog-agent/pkg/util/log"
"gopkg.in/ini.v1"
)

//go:embed peer_tags.ini
var peerTagFile []byte

// basePeerTags is the base set of peer tag precursors (tags from which peer tags
// are derived) we aggregate on when peer tag aggregation is enabled.
var basePeerTags = func() []string {
var precursors []string = []string{"_dd.base_service"}

cfg, err := ini.Load(peerTagFile)
if err != nil {
log.Error("Error loading file for peer tags: ", err)
return precursors
}
peerTags := cfg.Section("dd.apm.peer.tags").Keys()

for _, t := range peerTags {
ps := strings.Split(t.Value(), ",")
precursors = append(precursors, ps...)
}
sort.Strings(precursors)

return precursors
}()

func preparePeerTags(tags []string) []string {
if len(tags) == 0 {
return nil
}
var deduped []string
seen := make(map[string]struct{})
for _, t := range tags {
if _, ok := seen[t]; !ok {
seen[t] = struct{}{}
deduped = append(deduped, t)
}
}
sort.Strings(deduped)
return deduped
}
File renamed without changes.
Loading

0 comments on commit 68e0fd7

Please sign in to comment.