Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/spanmetrics] Fix flaky spanmetricsconnector test that relies on timing #33443

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type connectorImp struct {
// e.g. { "foo/barOK": { "serviceName": "foo", "span.name": "/bar", "status_code": "OK" }}
metricKeyToDimensions *cache.Cache[metrics.Key, pcommon.Map]

clock clock.Clock
ticker *clock.Ticker
done chan struct{}
started bool
Expand Down Expand Up @@ -109,7 +110,7 @@ func newDimensions(cfgDims []Dimension) []dimension {
return dims
}

func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Ticker) (*connectorImp, error) {
func newConnector(logger *zap.Logger, config component.Config, clock clock.Clock) (*connectorImp, error) {
logger.Info("Building spanmetrics connector")
cfg := config.(*Config)

Expand Down Expand Up @@ -148,7 +149,8 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
lastDeltaTimestamps: lastDeltaTimestamps,
ticker: ticker,
clock: clock,
ticker: clock.NewTicker(cfg.MetricsFlushInterval),
done: make(chan struct{}),
eDimensions: newDimensions(cfg.Events.Dimensions),
events: cfg.Events,
Expand Down Expand Up @@ -266,7 +268,7 @@ func (p *connectorImp) exportMetrics(ctx context.Context) {
// buildMetrics collects the computed raw metrics data and builds OTLP metrics.
func (p *connectorImp) buildMetrics() pmetric.Metrics {
m := pmetric.NewMetrics()
timestamp := pcommon.NewTimestampFromTime(time.Now())
timestamp := pcommon.NewTimestampFromTime(p.clock.Now())

p.resourceMetrics.ForEach(func(_ resourceKey, rawMetrics *resourceMetrics) {
rm := m.ResourceMetrics().AppendEmpty()
Expand Down Expand Up @@ -336,7 +338,7 @@ func (p *connectorImp) resetState() {
return
}

now := time.Now()
now := p.clock.Now()
p.resourceMetrics.ForEach(func(k resourceKey, m *resourceMetrics) {
// Exemplars are only relevant to this batch of traces, so must be cleared within the lock
if p.config.Exemplars.Enabled {
Expand Down Expand Up @@ -365,7 +367,7 @@ func (p *connectorImp) resetState() {
// and span metadata such as name, kind, status_code and any additional
// dimensions the user has configured.
func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
startTimestamp := pcommon.NewTimestampFromTime(time.Now())
startTimestamp := pcommon.NewTimestampFromTime(p.clock.Now())
for i := 0; i < traces.ResourceSpans().Len(); i++ {
rspans := traces.ResourceSpans().At(i)
resourceAttr := rspans.Resource().Attributes()
Expand Down Expand Up @@ -487,7 +489,7 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimesta

// If expiration is enabled, track the last seen time.
if p.config.MetricsExpiration > 0 {
v.lastSeen = time.Now()
v.lastSeen = p.clock.Now()
}

return v
Expand Down
80 changes: 50 additions & 30 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func disabledHistogramsConfig() HistogramConfig {
}
}

func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, expiration time.Duration, resourceMetricsKeyAttributes []string, deltaTimestampCacheSize int, excludedDimensions ...string) (*connectorImp, *clock.Mock, error) {
func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, expiration time.Duration, resourceMetricsKeyAttributes []string, deltaTimestampCacheSize int, clock clock.Clock, excludedDimensions ...string) (*connectorImp, error) {
cfg := &Config{
AggregationTemporality: temporality,
Histogram: histogramConfig(),
Expand All @@ -477,20 +477,18 @@ func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramC
// Add a resource attribute to test "process" attributes like IP, host, region, cluster, etc.
{regionResourceAttrName, nil},
},
Events: eventsConfig(),
MetricsExpiration: expiration,
TimestampCacheSize: &deltaTimestampCacheSize,
Events: eventsConfig(),
MetricsExpiration: expiration,
TimestampCacheSize: &deltaTimestampCacheSize,
MetricsFlushInterval: time.Nanosecond,
}

mockClock := clock.NewMock(time.Now())
ticker := mockClock.NewTicker(time.Nanosecond)

c, err := newConnector(zap.NewNop(), cfg, ticker)
c, err := newConnector(zap.NewNop(), cfg, clock)
if err != nil {
return nil, nil, err
return nil, err
}
c.metricsConsumer = consumertest.NewNop()
return c, mockClock, nil
return c, nil
}

func stringp(str string) *string {
Expand All @@ -500,7 +498,7 @@ func stringp(str string) *string {
func TestBuildKeySameServiceNameCharSequence(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
require.NoError(t, err)

span0 := ptrace.NewSpan()
Expand All @@ -520,7 +518,7 @@ func TestBuildKeyExcludeDimensionsAll(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ExcludeDimensions = []string{"span.kind", "service.name", "span.name", "status.code"}
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
require.NoError(t, err)

span0 := ptrace.NewSpan()
Expand All @@ -533,7 +531,7 @@ func TestBuildKeyExcludeWrongDimensions(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ExcludeDimensions = []string{"span.kind", "service.name.wrong.name", "span.name", "status.code"}
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
require.NoError(t, err)

span0 := ptrace.NewSpan()
Expand All @@ -545,7 +543,7 @@ func TestBuildKeyExcludeWrongDimensions(t *testing.T) {
func TestBuildKeyWithDimensions(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
require.NoError(t, err)

defaultFoo := pcommon.NewValueStr("bar")
Expand Down Expand Up @@ -641,7 +639,7 @@ func TestConcurrentShutdown(t *testing.T) {
core, observedLogs := observer.New(zapcore.InfoLevel)

// Test
p, _, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
p, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clock.NewMock(time.Now()))
require.NoError(t, err)
// Override the default no-op consumer and logger for testing.
p.metricsConsumer = new(consumertest.MetricsSink)
Expand Down Expand Up @@ -686,7 +684,7 @@ func TestConnectorCapabilities(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)

// Test
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
// Override the default no-op consumer for testing.
c.metricsConsumer = new(consumertest.MetricsSink)
assert.NoError(t, err)
Expand Down Expand Up @@ -719,7 +717,8 @@ func TestConsumeMetricsErrors(t *testing.T) {
logger := zap.New(core)

var wg sync.WaitGroup
p, mockClock, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
mockClock := clock.NewMock(time.Now())
p, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, mockClock)
require.NoError(t, err)
// Override the default no-op consumer and logger for testing.
p.metricsConsumer = &errConsumer{
Expand Down Expand Up @@ -884,7 +883,8 @@ func TestConsumeTraces(t *testing.T) {
// Prepare

mcon := &consumertest.MetricsSink{}
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, 0, []string{}, 1000)
mockClock := clock.NewMock(time.Now())
p, err := newConnectorImp(stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, 0, []string{}, 1000, mockClock)
require.NoError(t, err)
// Override the default no-op consumer with metrics sink for testing.
p.metricsConsumer = mcon
Expand All @@ -911,7 +911,7 @@ func TestConsumeTraces(t *testing.T) {
}

func TestMetricKeyCache(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clock.NewMock(time.Now()))
require.NoError(t, err)
traces := buildSampleTrace()

Expand Down Expand Up @@ -940,7 +940,7 @@ func TestMetricKeyCache(t *testing.T) {
}

func TestResourceMetricsCache(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clock.NewMock(time.Now()))
require.NoError(t, err)

// Test
Expand Down Expand Up @@ -977,7 +977,7 @@ func TestResourceMetricsCache(t *testing.T) {
}

func TestResourceMetricsExpiration(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Millisecond, []string{}, 1000)
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Millisecond, []string{}, 1000, clock.NewMock(time.Now()))
require.NoError(t, err)

// Test
Expand All @@ -1002,7 +1002,7 @@ func TestResourceMetricsKeyAttributes(t *testing.T) {
"service.name",
}

p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, resourceMetricsKeyAttributes, 1000)
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, resourceMetricsKeyAttributes, 1000, clock.NewMock(time.Now()))
require.NoError(t, err)

// Test
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func TestResourceMetricsKeyAttributes(t *testing.T) {

func BenchmarkConnectorConsumeTraces(b *testing.B) {
// Prepare
conn, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
conn, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clock.NewMock(time.Now()))
require.NoError(b, err)

traces := buildSampleTrace()
Expand All @@ -1054,7 +1054,7 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) {

func TestExcludeDimensionsConsumeTraces(t *testing.T) {
excludeDimensions := []string{"span.kind", "span.name", "totallyWrongNameDoesNotAffectAnything"}
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, excludeDimensions...)
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clock.NewMock(time.Now()), excludeDimensions...)
require.NoError(t, err)
traces := buildSampleTrace()

Expand Down Expand Up @@ -1184,7 +1184,8 @@ func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) {
wg.Add(len(wantDataPointCounts))

// Note: default dimension key cache size is 2.
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000)
mockClock := clock.NewMock(time.Now())
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, mockClock)
require.NoError(t, err)
// Override the default no-op consumer with metrics sink for testing.
p.metricsConsumer = mcon
Expand Down Expand Up @@ -1269,7 +1270,8 @@ func TestConnectorConsumeTracesExpiredMetrics(t *testing.T) {
mcon := &consumertest.MetricsSink{}

// Creating a connector with a very short metricsTTL to ensure that the metrics are expired.
p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Nanosecond, []string{}, 1000)
mockClock := clock.NewMock(time.Now())
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Nanosecond, []string{}, 1000, mockClock)
require.NoError(t, err)
// Override the default no-op consumer with metrics sink for testing.
p.metricsConsumer = mcon
Expand Down Expand Up @@ -1489,7 +1491,7 @@ func TestSpanMetrics_Events(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Events = tt.eventsConfig
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
c, err := newConnector(zaptest.NewLogger(t), cfg, clock.NewMock(time.Now()))
require.NoError(t, err)
err = c.ConsumeTraces(context.Background(), buildSampleTrace())
require.NoError(t, err)
Expand Down Expand Up @@ -1544,7 +1546,7 @@ func TestExemplarsAreDiscardedAfterFlushing(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), tt.histogramConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, 1000)
p, err := newConnectorImp(stringp("defaultNullValue"), tt.histogramConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, 1000, clock.NewMock(time.Now()))
p.metricsConsumer = &consumertest.MetricsSink{}
require.NoError(t, err)

Expand Down Expand Up @@ -1671,7 +1673,8 @@ func TestTimestampsForUninterruptedStream(t *testing.T) {

for _, tt := range tests {
t.Run(tt.temporality, func(t *testing.T) {
p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, 1000)
mockClock := newAlwaysIncreasingClock()
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, 1000, mockClock)
require.NoError(t, err)
p.metricsConsumer = &consumertest.MetricsSink{}

Expand Down Expand Up @@ -1769,7 +1772,8 @@ func verifyAndCollectCommonTimestamps(t *testing.T, m pmetric.Metrics) (start pc

func TestDeltaTimestampCacheExpiry(t *testing.T) {
timestampCacheSize := 1
p, _, err := newConnectorImp(stringp("defaultNullValue"), exponentialHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, delta, 0, []string{}, timestampCacheSize)
mockClock := newAlwaysIncreasingClock()
p, err := newConnectorImp(stringp("defaultNullValue"), exponentialHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, delta, 0, []string{}, timestampCacheSize, mockClock)
require.NoError(t, err)
p.metricsConsumer = &consumertest.MetricsSink{}

Expand Down Expand Up @@ -1836,3 +1840,19 @@ func TestDeltaTimestampCacheExpiry(t *testing.T) {
serviceAStartTimestamp2 := p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[2].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).StartTimestamp()
assert.Greater(t, serviceAStartTimestamp2, serviceATimestamp1) // These would be the same if nothing was evicted from the cache
}

// Clock where Now() always returns a greater value than the previous return value
type alwaysIncreasingClock struct {
clock.Clock
}

func newAlwaysIncreasingClock() alwaysIncreasingClock {
return alwaysIncreasingClock{
Clock: clock.NewMock(time.Now()),
}
}

func (c alwaysIncreasingClock) Now() time.Time {
c.Clock.(*clock.Mock).Add(time.Second)
return c.Clock.Now()
}
6 changes: 1 addition & 5 deletions connector/spanmetricsconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,10 @@ func createDefaultConfig() component.Config {
}

func createTracesToMetricsConnector(ctx context.Context, params connector.Settings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) {
c, err := newConnector(params.Logger, cfg, metricsTicker(ctx, cfg))
c, err := newConnector(params.Logger, cfg, clock.FromContext(ctx))
if err != nil {
return nil, err
}
c.metricsConsumer = nextConsumer
return c, nil
}

func metricsTicker(ctx context.Context, cfg component.Config) *clock.Ticker {
return clock.FromContext(ctx).NewTicker(cfg.(*Config).MetricsFlushInterval)
}