Skip to content

Commit

Permalink
[apm] peer.service aggregation for trace stats, option to compute sta…
Browse files Browse the repository at this point in the history
…ts based on span.kind (#16103)

* [apm] initial commit; add peer.service to stats

* [apm] re-generate msgpack to account for peer.service

* [apm] add tests for peer.service aggregation

* [apm] add comment to explain peer_service in proto

* [apm] comment revision

* [apm] add release notes

* Update pkg/trace/stats/aggregation.go

Co-authored-by: Diana Shevchenko <40775148+dianashevchenko@users.noreply.github.com>

* Update pkg/trace/stats/aggregation.go

Co-authored-by: Diana Shevchenko <40775148+dianashevchenko@users.noreply.github.com>

* [apm] add consideration of span.kind == INTERNAL for calculating stats

* [apm] consider CLIENT/PRODUCER specifically rather than INTERNAL for span.kind; add more tests

* [apm] remove incorrect comment

* [apm] correct tests, update release notes

* [apm] RemoteOutbound -> RemoteOutgoing

* [apm] remove unnecessary newlines

* [apm] update peer.service test to use a more realistic example for its test case

* Update pkg/trace/stats/concentrator.go

Co-authored-by: Peter Kalmakis <peter.kalmakis@datadoghq.com>

* Update releasenotes/notes/add-peer-service-for-trace-stats-225f1b90c9627c18.yaml

Co-authored-by: Austin Lai <76412946+alai97@users.noreply.github.com>

* Update releasenotes/notes/add-peer-service-for-trace-stats-225f1b90c9627c18.yaml

Co-authored-by: Austin Lai <76412946+alai97@users.noreply.github.com>

* [apm] fix bug from remote commit

* [apm] remove consideration of span.kind for trace stats computation

* [apm] add normalization for peer.service

* [apm] add concentrator configuration to enable/disable peer.service stats aggregation

* [apm] ensure peer.service is exported from client stats aggregator as well

* [apm] update config_template.yaml

* [apm] further clarify effect of disabling peer.service stats aggregation

* [apm] fix test failures

* [apm] rework configuration of peer.service, add back extra aggregators for concentrator

* [apm] set peer service aggregation to false by default, ensure config is loaded properly

* [apm] revise variable name to be consistent, add check for peer service aggregation in client stats aggregator

* [apm] add logic to compute stats based on specific span.kind values

* [apm] add config for option to enable stats computation by span.kind

* [apm] update documentation and release notes

* Update cmd/trace-agent/config/config.go

Co-authored-by: Ahmed Mezghani <38987709+ahmed-mez@users.noreply.github.com>

* Update pkg/trace/traceutil/span.go

Co-authored-by: Ahmed Mezghani <38987709+ahmed-mez@users.noreply.github.com>

* [apm] rename peer service aggregation config field

* [apm] add more test cases to ensure case insensitivity for span.kind check

* [apm] move span.kind check func implementation to concentrator.go

* Update pkg/trace/stats/concentrator.go

Co-authored-by: Peter Kalmakis <peter.kalmakis@datadoghq.com>

* [apm] finish move of span.kind check func

* [apm] go back to setting a bool flag for peer.service in concentrator

* [apm] add back ExtraAggregators for info test case

* [apm] fix testutil fixture BucketWithSpans

* [apm] move peer.service aggregation check back into NewAggregationFromSpan

* [apm] change CSA to use bool for peer svc aggregation as well

* [apm] remove unused const

* [apm] update guidance on new config options

* [apm] add small test for bucket aggregation key creation and peer.service in CSA

* [apm] remove unused field

* [apm] fix fuzz test

* [apm] fix stats info tests

---------

Co-authored-by: Diana Shevchenko <40775148+dianashevchenko@users.noreply.github.com>
Co-authored-by: Peter Kalmakis <peter.kalmakis@datadoghq.com>
Co-authored-by: Austin Lai <76412946+alai97@users.noreply.github.com>
Co-authored-by: Ahmed Mezghani <38987709+ahmed-mez@users.noreply.github.com>
  • Loading branch information
5 people committed Apr 6, 2023
1 parent 68b471b commit 70820e3
Show file tree
Hide file tree
Showing 28 changed files with 1,170 additions and 227 deletions.
2 changes: 2 additions & 0 deletions cmd/trace-agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ func applyDatadogConfig(c *config.AgentConfig) error {
if coreconfig.Datadog.IsSet("apm_config.connection_limit") {
c.ConnectionLimit = coreconfig.Datadog.GetInt("apm_config.connection_limit")
}
c.PeerServiceAggregation = coreconfig.Datadog.GetBool("apm_config.peer_service_aggregation")
c.ComputeStatsBySpanKind = coreconfig.Datadog.GetBool("apm_config.compute_stats_by_span_kind")
if coreconfig.Datadog.IsSet("apm_config.extra_sample_rate") {
c.ExtraSampleRate = coreconfig.Datadog.GetFloat64("apm_config.extra_sample_rate")
}
Expand Down
44 changes: 44 additions & 0 deletions cmd/trace-agent/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,3 +1094,47 @@ func TestSetMaxMemCPU(t *testing.T) {
assert.Equal(t, 300.0, c.MaxMemory)
})
}

func TestPeerServiceAggregation(t *testing.T) {
t.Run("disabled", func(t *testing.T) {
defer cleanConfig()
cfg := config.New()
err := applyDatadogConfig(cfg)

assert := assert.New(t)
assert.NoError(err)
assert.False(cfg.PeerServiceAggregation)
})
t.Run("enabled", func(t *testing.T) {
defer cleanConfig()
coreconfig.Datadog.Set("apm_config.peer_service_aggregation", true)
cfg := config.New()
err := applyDatadogConfig(cfg)

assert := assert.New(t)
assert.NoError(err)
assert.True(cfg.PeerServiceAggregation)
})
}

func TestComputeStatsBySpanKind(t *testing.T) {
t.Run("disabled", func(t *testing.T) {
defer cleanConfig()
cfg := config.New()
err := applyDatadogConfig(cfg)

assert := assert.New(t)
assert.NoError(err)
assert.False(cfg.ComputeStatsBySpanKind)
})
t.Run("enabled", func(t *testing.T) {
defer cleanConfig()
coreconfig.Datadog.Set("apm_config.compute_stats_by_span_kind", true)
cfg := config.New()
err := applyDatadogConfig(cfg)

assert := assert.New(t)
assert.NoError(err)
assert.True(cfg.ComputeStatsBySpanKind)
})
}
2 changes: 2 additions & 0 deletions pkg/config/apm.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func setupAPM(config Config) {
config.BindEnvAndSetDefault("apm_config.windows_pipe_buffer_size", 1_000_000, "DD_APM_WINDOWS_PIPE_BUFFER_SIZE") //nolint:errcheck
config.BindEnvAndSetDefault("apm_config.windows_pipe_security_descriptor", "D:AI(A;;GA;;;WD)", "DD_APM_WINDOWS_PIPE_SECURITY_DESCRIPTOR") //nolint:errcheck
config.BindEnvAndSetDefault("apm_config.remote_tagger", true, "DD_APM_REMOTE_TAGGER") //nolint:errcheck
config.BindEnvAndSetDefault("apm_config.peer_service_stats_aggregation", false, "DD_APM_PEER_SERVICE_STATS_AGGREGATION") //nolint:errcheck
config.BindEnvAndSetDefault("apm_config.compute_stats_by_span_kind", false, "DD_APM_COMPUTE_STATS_BY_SPAN_KIND") //nolint:errcheck

config.BindEnv("apm_config.max_catalog_services", "DD_APM_MAX_CATALOG_SERVICES")
config.BindEnv("apm_config.receiver_timeout", "DD_APM_RECEIVER_TIMEOUT")
Expand Down
18 changes: 18 additions & 0 deletions pkg/config/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,24 @@ api_key:
#
# connection_limit: 2000

## @param compute_stats_by_span_kind - bool - default: false
## @env DD_APM_COMPUTE_STATS_BY_SPAN_KIND - bool - default: false
## Enables an additional stats computation check on spans to see they have an eligible `span.kind` (server, consumer, client, producer).
## If enabled, a span with an eligible `span.kind` will have stats computed. If disabled, only top-level and measured spans will have stats computed.
## NOTE: For stats computed from OTel traces, only top-level spans are considered when this option is off.
## If you are sending OTel traces and want stats on non-top-level spans, this flag will need to be enabled.
# compute_stats_by_span_kind: false

## @param peer_service_aggregation - bool - default: false
## @env DD_APM_PEER_SERVICE_AGGREGATION - bool - default: false
## Enables `peer.service` aggregation in the agent. If disabled, aggregated trace stats will not include `peer.service` as a dimension.
## For the best experience with `peer.service`, it is recommended to also enable `compute_stats_by_span_kind`.
## If enabling both causes the Agent to consume too many resources, try disabling `compute_stats_by_span_kind` first.
## If the overhead remains high, it will be due to a high cardinality of `peer.service` values from the traces. You may need to check your instrumentation.
## NOTE: If you are using an OTel tracer it's best to have both enabled because client/producer spans with a `peer.service` value
## may not be marked by the Agent as top-level spans.
# peer_service_aggregation: false

## @param features - list of strings - optional
## @env DD_APM_FEATURES - comma separated list of strings - optional
## Configure additional beta APM features.
Expand Down
59 changes: 59 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,3 +1298,62 @@ fips:
err := setupFipsEndpoints(testConfig)
require.Error(t, err)
}

func TestEnablePeerServiceStatsAggregationYAML(t *testing.T) {
datadogYaml := `
apm_config:
peer_service_stats_aggregation: true
`
testConfig := setupConfFromYAML(datadogYaml)
err := setupFipsEndpoints(testConfig)
require.NoError(t, err)
require.True(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation"))

datadogYaml = `
apm_config:
peer_service_stats_aggregation: false
`
testConfig = setupConfFromYAML(datadogYaml)
err = setupFipsEndpoints(testConfig)
require.NoError(t, err)
require.False(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation"))
}

func TestEnablePeerServiceStatsAggregationEnv(t *testing.T) {
t.Setenv("DD_APM_PEER_SERVICE_STATS_AGGREGATION", "true")
testConfig := setupConfFromYAML("")
require.True(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation"))
t.Setenv("DD_APM_PEER_SERVICE_STATS_AGGREGATION", "false")
testConfig = setupConfFromYAML("")
require.False(t, testConfig.GetBool("apm_config.peer_service_stats_aggregation"))
}

func TestEnableStatsComputationBySpanKindYAML(t *testing.T) {
datadogYaml := `
apm_config:
compute_stats_by_span_kind: false
`
testConfig := setupConfFromYAML(datadogYaml)
err := setupFipsEndpoints(testConfig)
require.NoError(t, err)
require.False(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind"))

datadogYaml = `
apm_config:
compute_stats_by_span_kind: true
`
testConfig = setupConfFromYAML(datadogYaml)
err = setupFipsEndpoints(testConfig)
require.NoError(t, err)
require.True(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind"))

}

func TestComputeStatsBySpanKindEnv(t *testing.T) {
t.Setenv("DD_APM_COMPUTE_STATS_BY_SPAN_KIND", "false")
testConfig := setupConfFromYAML("")
require.False(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind"))
t.Setenv("DD_APM_COMPUTE_STATS_BY_SPAN_KIND", "true")
testConfig = setupConfFromYAML("")
require.True(t, testConfig.GetBool("apm_config.compute_stats_by_span_kind"))
}
20 changes: 20 additions & 0 deletions pkg/trace/agent/normalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
// tagSamplingPriority specifies the sampling priority of the trace.
// DEPRECATED: Priority is now specified as a TraceChunk field.
tagSamplingPriority = "_sampling_priority_v1"
// peerServiceKey is the key for the peer.service meta field.
peerServiceKey = "peer.service"
)

var (
Expand Down Expand Up @@ -60,6 +62,24 @@ func (a *Agent) normalize(ts *info.TagStats, s *pb.Span) error {
}
s.Service = svc

pSvc, ok := s.Meta[peerServiceKey]
if ok {
ps, err := traceutil.NormalizePeerService(pSvc)
switch err {
case traceutil.ErrTooLong:
ts.SpansMalformed.PeerServiceTruncate.Inc()
log.Debugf("Fixing malformed trace. peer.service is too long (reason:peer_service_truncate), truncating peer.service to length=%d: %s", traceutil.MaxServiceLen, ps)
case traceutil.ErrInvalid:
ts.SpansMalformed.PeerServiceInvalid.Inc()
log.Debugf("Fixing malformed trace. peer.service is invalid (reason:peer_service_invalid), replacing invalid peer.service=%s with empty string", pSvc)
default:
if err != nil {
log.Debugf("Unexpected error in peer.service normalization from original value (%s) to new value (%s): %s", pSvc, ps, err)
}
}
s.Meta[peerServiceKey] = ps
}

if a.conf.HasFeature("component2name") {
// This feature flag determines the component tag to become the span name.
//
Expand Down
23 changes: 12 additions & 11 deletions pkg/trace/api/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,17 +243,18 @@ func TestInfoHandler(t *testing.T) {
Host: "https://target-intake.datadoghq.com",
NoProxy: true,
}},
BucketInterval: time.Second,
ExtraAggregators: []string{"agg:val"},
ExtraSampleRate: 2.4,
TargetTPS: 11,
MaxEPS: 12,
ReceiverHost: "localhost",
ReceiverPort: 8111,
ReceiverSocket: "/sock/path",
ConnectionLimit: 12,
ReceiverTimeout: 100,
MaxRequestBytes: 123,
BucketInterval: time.Second,
ExtraAggregators: []string{"agg:val"},
PeerServiceAggregation: true,
ExtraSampleRate: 2.4,
TargetTPS: 11,
MaxEPS: 12,
ReceiverHost: "localhost",
ReceiverPort: 8111,
ReceiverSocket: "/sock/path",
ConnectionLimit: 12,
ReceiverTimeout: 100,
MaxRequestBytes: 123,
StatsWriter: &config.WriterConfig{
ConnectionLimit: 20,
QueueSize: 12,
Expand Down
6 changes: 4 additions & 2 deletions pkg/trace/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,10 @@ type AgentConfig struct {
Endpoints []*Endpoint

// Concentrator
BucketInterval time.Duration // the size of our pre-aggregation per bucket
ExtraAggregators []string
BucketInterval time.Duration // the size of our pre-aggregation per bucket
ExtraAggregators []string // DEPRECATED
PeerServiceAggregation bool // enables/disables stats aggregation for peer.service, used by Concentrator and ClientStatsAggregator
ComputeStatsBySpanKind bool // enables/disables the computing of stats based on a span's `span.kind` field

// Sampler configuration
ExtraSampleRate float64
Expand Down
20 changes: 12 additions & 8 deletions pkg/trace/info/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ func TestPublishReceiverStats(t *testing.T) {
atom(10),
atom(11),
atom(12),
atom(13),
atom(14),
},
TracesFiltered: atom(4),
TracesPriorityNone: atom(5),
Expand Down Expand Up @@ -487,14 +489,16 @@ func TestPublishReceiverStats(t *testing.T) {
"ServiceEmpty": 2.0,
"ServiceTruncate": 3.0,
"ServiceInvalid": 4.0,
"SpanNameEmpty": 5.0,
"SpanNameTruncate": 6.0,
"SpanNameInvalid": 7.0,
"ResourceEmpty": 8.0,
"TypeTruncate": 9.0,
"InvalidStartDate": 10.0,
"InvalidDuration": 11.0,
"InvalidHTTPStatusCode": 12.0,
"PeerServiceTruncate": 5.0,
"PeerServiceInvalid": 6.0,
"SpanNameEmpty": 7.0,
"SpanNameTruncate": 8.0,
"SpanNameInvalid": 9.0,
"ResourceEmpty": 10.0,
"TypeTruncate": 11.0,
"InvalidStartDate": 12.0,
"InvalidDuration": 13.0,
"InvalidHTTPStatusCode": 14.0,
},
"SpansReceived": 10.0,
"TracerVersion": "",
Expand Down
8 changes: 8 additions & 0 deletions pkg/trace/info/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ type SpansMalformed struct {
ServiceTruncate atomic.Int64
// ServiceInvalid is when a span's Service doesn't conform to Datadog tag naming standards
ServiceInvalid atomic.Int64
// PeerServiceTruncate is when a span's peer.service is truncated for exceeding the max length
PeerServiceTruncate atomic.Int64
// PeerServiceInvalid is when a span's peer.service doesn't conform to Datadog tag naming standards
PeerServiceInvalid atomic.Int64
// SpanNameEmpty is when a span's Name is empty
SpanNameEmpty atomic.Int64
// SpanNameTruncate is when a span's Name is truncated for exceeding the max length
Expand All @@ -280,6 +284,8 @@ func (s *SpansMalformed) tagCounters() map[string]*atomic.Int64 {
"service_empty": &s.ServiceEmpty,
"service_truncate": &s.ServiceTruncate,
"service_invalid": &s.ServiceInvalid,
"peer_service_truncate": &s.PeerServiceTruncate,
"peer_service_invalid": &s.PeerServiceInvalid,
"span_name_empty": &s.SpanNameEmpty,
"span_name_truncate": &s.SpanNameTruncate,
"span_name_invalid": &s.SpanNameInvalid,
Expand Down Expand Up @@ -420,6 +426,8 @@ func (s *Stats) update(recent *Stats) {
s.SpansMalformed.ServiceEmpty.Add(recent.SpansMalformed.ServiceEmpty.Load())
s.SpansMalformed.ServiceTruncate.Add(recent.SpansMalformed.ServiceTruncate.Load())
s.SpansMalformed.ServiceInvalid.Add(recent.SpansMalformed.ServiceInvalid.Load())
s.SpansMalformed.PeerServiceTruncate.Add(recent.SpansMalformed.PeerServiceTruncate.Load())
s.SpansMalformed.PeerServiceInvalid.Add(recent.SpansMalformed.PeerServiceInvalid.Load())
s.SpansMalformed.SpanNameEmpty.Add(recent.SpansMalformed.SpanNameEmpty.Load())
s.SpansMalformed.SpanNameTruncate.Add(recent.SpansMalformed.SpanNameTruncate.Load())
s.SpansMalformed.SpanNameInvalid.Add(recent.SpansMalformed.SpanNameInvalid.Load())
Expand Down
23 changes: 14 additions & 9 deletions pkg/trace/info/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ package info
import (
"bytes"
"fmt"
"github.com/DataDog/datadog-agent/pkg/trace/log"
"reflect"
"strings"
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/trace/log"

"go.uber.org/atomic"

"github.com/DataDog/datadog-agent/pkg/trace/metrics"
Expand Down Expand Up @@ -60,6 +61,8 @@ func TestSpansMalformed(t *testing.T) {
"span_name_invalid": 0,
"span_name_empty": 0,
"service_truncate": 0,
"peer_service_truncate": 0,
"peer_service_invalid": 0,
"invalid_start_date": 0,
"invalid_http_status_code": 0,
"invalid_duration": 0,
Expand Down Expand Up @@ -241,12 +244,14 @@ func TestReceiverStats(t *testing.T) {
stats.SpansMalformed.ServiceInvalid.Store(4)
stats.SpansMalformed.SpanNameEmpty.Store(5)
stats.SpansMalformed.SpanNameTruncate.Store(6)
stats.SpansMalformed.SpanNameInvalid.Store(7)
stats.SpansMalformed.ResourceEmpty.Store(8)
stats.SpansMalformed.TypeTruncate.Store(9)
stats.SpansMalformed.InvalidStartDate.Store(10)
stats.SpansMalformed.InvalidDuration.Store(11)
stats.SpansMalformed.InvalidHTTPStatusCode.Store(12)
stats.SpansMalformed.PeerServiceTruncate.Store(7)
stats.SpansMalformed.PeerServiceInvalid.Store(8)
stats.SpansMalformed.SpanNameInvalid.Store(9)
stats.SpansMalformed.ResourceEmpty.Store(10)
stats.SpansMalformed.TypeTruncate.Store(11)
stats.SpansMalformed.InvalidStartDate.Store(12)
stats.SpansMalformed.InvalidDuration.Store(13)
stats.SpansMalformed.InvalidHTTPStatusCode.Store(14)
return &ReceiverStats{
Stats: map[Tags]*TagStats{
tags: {
Expand All @@ -260,7 +265,7 @@ func TestReceiverStats(t *testing.T) {
t.Run("PublishAndReset", func(t *testing.T) {
rs := testStats()
rs.PublishAndReset()
assert.EqualValues(t, 39, statsclient.counts.Load())
assert.EqualValues(t, 41, statsclient.counts.Load())
assertStatsAreReset(t, rs)
})

Expand All @@ -282,7 +287,7 @@ func TestReceiverStats(t *testing.T) {
logs := strings.Split(b.String(), "\n")
assert.Equal(t, "[INFO] [lang:go lang_version:1.12 lang_vendor:gov interpreter:gcc tracer_version:1.33 endpoint_version:v0.4] -> traces received: 1, traces filtered: 4, traces amount: 9 bytes, events extracted: 13, events sampled: 14",
logs[0])
assert.Equal(t, "[WARN] [lang:go lang_version:1.12 lang_vendor:gov interpreter:gcc tracer_version:1.33 endpoint_version:v0.4] -> traces_dropped(decoding_error:1, empty_trace:3, foreign_span:6, payload_too_large:2, span_id_zero:5, timeout:7, trace_id_zero:4, unexpected_eof:8), spans_malformed(duplicate_span_id:1, invalid_duration:11, invalid_http_status_code:12, invalid_start_date:10, resource_empty:8, service_empty:2, service_invalid:4, service_truncate:3, span_name_empty:5, span_name_invalid:7, span_name_truncate:6, type_truncate:9). Enable debug logging for more details.",
assert.Equal(t, "[WARN] [lang:go lang_version:1.12 lang_vendor:gov interpreter:gcc tracer_version:1.33 endpoint_version:v0.4] -> traces_dropped(decoding_error:1, empty_trace:3, foreign_span:6, payload_too_large:2, span_id_zero:5, timeout:7, trace_id_zero:4, unexpected_eof:8), spans_malformed(duplicate_span_id:1, invalid_duration:13, invalid_http_status_code:14, invalid_start_date:12, peer_service_invalid:8, peer_service_truncate:7, resource_empty:10, service_empty:2, service_invalid:4, service_truncate:3, span_name_empty:5, span_name_invalid:9, span_name_truncate:6, type_truncate:11). Enable debug logging for more details.",
logs[1])

assertStatsAreReset(t, rs)
Expand Down
Loading

0 comments on commit 70820e3

Please sign in to comment.