Skip to content

Commit

Permalink
[chore] Source Timeout for Providers (open-telemetry#33958)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>

Datadog exporter tests have an unnecessary 30 second wait while it waits
for various metadata service calls to timeout (this is the default TCP
timeout in the go network stack). Also had to modify the ec2 provider so
that it respected the context. Most of the changes are just propagating
this new timeout to the provider, and propagating this context around. I
made the decision to differentiate the timeout context and the overall
context for the chainProvider - these could be brought together but it
looks like we had existing logic for a context timeout vs a
chainProvider context cancellation so I wanted to respect that split.

**Testing:** 
mostly stuck to a 31 second timeout on existing tests to not change
behavior. Added a tighter timeout for some of the slower tests.
Added some tests for the timeout logic

---------

Co-authored-by: Pablo Baeyens <pbaeyens31+github@gmail.com>
  • Loading branch information
ankitpatel96 and mx-psi authored Jul 17, 2024
1 parent 892692a commit 734a030
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 26 deletions.
6 changes: 6 additions & 0 deletions exporter/datadogexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"regexp"
"strings"
"time"

"github.com/DataDog/datadog-agent/pkg/util/hostname/validate"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -411,6 +412,11 @@ type HostMetadataConfig struct {
// These tags will be attached to telemetry signals that have the host metadata hostname.
// To attach tags to telemetry signals regardless of the host, use a processor instead.
Tags []string `mapstructure:"tags"`

// sourceTimeout is the timeout to fetch from each provider - for example AWS IMDS.
// If unset, or set to zero duration, there will be no timeout applied.
// Default is no timeout.
sourceTimeout time.Duration
}

// Config defines configuration for the Datadog exporter.
Expand Down
10 changes: 5 additions & 5 deletions exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ type factory struct {
registry *featuregate.Registry
}

func (f *factory) SourceProvider(set component.TelemetrySettings, configHostname string) (source.Provider, error) {
func (f *factory) SourceProvider(set component.TelemetrySettings, configHostname string, timeout time.Duration) (source.Provider, error) {
f.onceProvider.Do(func() {
f.sourceProvider, f.providerErr = hostmetadata.GetSourceProvider(set, configHostname)
f.sourceProvider, f.providerErr = hostmetadata.GetSourceProvider(set, configHostname, timeout)
})
return f.sourceProvider, f.providerErr
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func (f *factory) createMetricsExporter(
c component.Config,
) (exporter.Metrics, error) {
cfg := checkAndCastConfig(c, set.TelemetrySettings.Logger)
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname)
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.sourceTimeout)
if err != nil {
return nil, fmt.Errorf("failed to build hostname provider: %w", err)
}
Expand Down Expand Up @@ -409,7 +409,7 @@ func (f *factory) createTracesExporter(
wg sync.WaitGroup // waits for agent to exit
)

hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname)
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.sourceTimeout)
if err != nil {
return nil, fmt.Errorf("failed to build hostname provider: %w", err)
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func (f *factory) createLogsExporter(

var pusher consumer.ConsumeLogsFunc
var logsAgent logsagentpipeline.LogsAgent
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname)
hostProvider, err := f.SourceProvider(set.TelemetrySettings, cfg.Hostname, cfg.HostMetadata.sourceTimeout)
if err != nil {
return nil, fmt.Errorf("failed to build hostname provider: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions exporter/datadogexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ func TestOnlyMetadata(t *testing.T) {
HostMetadata: HostMetadataConfig{
Enabled: true,
HostnameSource: HostnameSourceFirstResource,
sourceTimeout: 50 * time.Millisecond,
},
}

Expand Down
4 changes: 3 additions & 1 deletion exporter/datadogexporter/internal/hostmetadata/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package hostmetadata // import "github.com/open-telemetry/opentelemetry-collecto

import (
"fmt"
"time"

"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"go.opentelemetry.io/collector/component"
Expand All @@ -27,7 +28,7 @@ var _ = featuregate.GlobalRegistry().MustRegister(
featuregate.WithRegisterToVersion("0.75.0"),
)

func GetSourceProvider(set component.TelemetrySettings, configHostname string) (source.Provider, error) {
func GetSourceProvider(set component.TelemetrySettings, configHostname string, timeout time.Duration) (source.Provider, error) {
ecs, err := ecs.NewProvider(set)
if err != nil {
return nil, fmt.Errorf("failed to build ECS Fargate provider: %w", err)
Expand Down Expand Up @@ -69,6 +70,7 @@ func GetSourceProvider(set component.TelemetrySettings, configHostname string) (
"system": system.NewProvider(set.Logger),
},
[]string{"config", "azure", "ecs", "ec2", "gcp", "kubernetes", "system"},
timeout,
)

if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion exporter/datadogexporter/internal/hostmetadata/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ package hostmetadata
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
)

func TestHost(t *testing.T) {
p, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "test-host")
p, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "test-host", 31*time.Second)
require.NoError(t, err)
src, err := p.Source(context.Background())
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func isDefaultHostname(hostname string) bool {
}

// GetHostInfo gets the hostname info from EC2 metadata
func GetHostInfo(logger *zap.Logger) (hostInfo *HostInfo) {
func GetHostInfo(ctx context.Context, logger *zap.Logger) (hostInfo *HostInfo) {
sess, err := session.NewSession()
hostInfo = &HostInfo{}

Expand All @@ -54,18 +54,18 @@ func GetHostInfo(logger *zap.Logger) (hostInfo *HostInfo) {

meta := ec2metadata.New(sess)

if !meta.Available() {
if !meta.AvailableWithContext(ctx) {
logger.Debug("EC2 Metadata not available")
return
}

if idDoc, err := meta.GetInstanceIdentityDocument(); err == nil {
if idDoc, err := meta.GetInstanceIdentityDocumentWithContext(ctx); err == nil {
hostInfo.InstanceID = idDoc.InstanceID
} else {
logger.Warn("Failed to get EC2 instance id document", zap.Error(err))
}

if ec2Hostname, err := meta.GetMetadata("hostname"); err == nil {
if ec2Hostname, err := meta.GetMetadataWithContext(ctx, "hostname"); err == nil {
hostInfo.EC2Hostname = ec2Hostname
} else {
logger.Warn("Failed to get EC2 hostname", zap.Error(err))
Expand Down Expand Up @@ -104,12 +104,12 @@ func NewProvider(logger *zap.Logger) (*Provider, error) {
}, nil
}

func (p *Provider) fillHostInfo() {
p.once.Do(func() { p.hostInfo = *GetHostInfo(p.logger) })
func (p *Provider) fillHostInfo(ctx context.Context) {
p.once.Do(func() { p.hostInfo = *GetHostInfo(ctx, p.logger) })
}

func (p *Provider) Source(_ context.Context) (source.Source, error) {
p.fillHostInfo()
func (p *Provider) Source(ctx context.Context) (source.Source, error) {
p.fillHostInfo(ctx)
if p.hostInfo.InstanceID == "" {
return source.Source{}, fmt.Errorf("instance ID is unavailable")
}
Expand Down Expand Up @@ -175,6 +175,6 @@ func (p *Provider) ClusterName(ctx context.Context) (string, error) {
}

func (p *Provider) HostInfo() *HostInfo {
p.fillHostInfo()
p.fillHostInfo(context.Background())
return &p.hostInfo
}
4 changes: 2 additions & 2 deletions exporter/datadogexporter/internal/hostmetadata/metadata.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package metadata is responsible for collecting host metadata from different providers
// Package hostmetadata is responsible for collecting host metadata from different providers
// such as EC2, ECS, AWS, etc and pushing it to Datadog.
package hostmetadata // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/hostmetadata"

Expand Down Expand Up @@ -77,7 +77,7 @@ func fillHostMetadata(params exporter.Settings, pcfg PusherConfig, p source.Prov
hm.Processes = gohai.NewProcessesPayload(hm.Meta.Hostname, params.Logger)
// EC2 data was not set from attributes
if hm.Meta.EC2Hostname == "" {
ec2HostInfo := ec2.GetHostInfo(params.Logger)
ec2HostInfo := ec2.GetHostInfo(context.Background(), params.Logger)
hm.Meta.EC2Hostname = ec2HostInfo.EC2Hostname
hm.Meta.InstanceID = ec2HostInfo.InstanceID
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestFillHostMetadata(t *testing.T) {
ConfigTags: []string{"key1:tag1", "key2:tag2", "env:prod"},
}

hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "hostname")
hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "hostname", 31*time.Second)
require.NoError(t, err)

metadata := payload.NewEmpty()
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestPusher(t *testing.T) {
params := exportertest.NewNopSettings()
params.BuildInfo = mockBuildInfo

hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "source-hostname")
hostProvider, err := GetSourceProvider(componenttest.NewNopTelemetrySettings(), "source-hostname", 31*time.Second)
require.NoError(t, err)

attrs := testutil.NewAttributeMap(map[string]string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"go.uber.org/zap"
Expand All @@ -19,6 +20,7 @@ type chainProvider struct {
logger *zap.Logger
providers map[string]source.Provider
priorityList []string
timeout time.Duration
}

func (p *chainProvider) Source(ctx context.Context) (source.Source, error) {
Expand All @@ -32,14 +34,24 @@ func (p *chainProvider) Source(ctx context.Context) (source.Source, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Make a different context for our provider calls, to differentiate between a provider timing out and the entire
// context being cancelled
var childCtx context.Context
if p.timeout != 0 {
childCtx, cancel = context.WithTimeout(ctx, p.timeout)
} else {
childCtx, cancel = context.WithCancel(ctx)
}
defer cancel()

// Run all providers in parallel
replies := make([]chan reply, len(p.priorityList))
for i, source := range p.priorityList {
provider := p.providers[source]
replies[i] = make(chan reply)
p.logger.Debug("Trying out source provider", zap.String("provider", source))
go func(i int) {
src, err := provider.Source(ctx)
src, err := provider.Source(childCtx)
replies[i] <- reply{src: src, err: err}
}(i)
}
Expand All @@ -65,14 +77,14 @@ func (p *chainProvider) Source(ctx context.Context) (source.Source, error) {
}

// Chain providers into a single provider that returns the first available hostname.
func Chain(logger *zap.Logger, providers map[string]source.Provider, priorityList []string) (source.Provider, error) {
func Chain(logger *zap.Logger, providers map[string]source.Provider, priorityList []string, timeout time.Duration) (source.Provider, error) {
for _, source := range priorityList {
if _, ok := providers[source]; !ok {
return nil, fmt.Errorf("%q source is not available in providers", source)
}
}

return &chainProvider{logger: logger, providers: providers, priorityList: priorityList}, nil
return &chainProvider{logger: logger, providers: providers, priorityList: priorityList, timeout: timeout}, nil
}

var _ source.Provider = (*configProvider)(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package provider // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -38,8 +39,12 @@ type delayedProvider struct {
}

func (p *delayedProvider) Source(ctx context.Context) (source.Source, error) {
time.Sleep(p.delay)
return p.provider.Source(ctx)
select {
case <-ctx.Done():
return source.Source{}, fmt.Errorf("no source provider was available")
case <-time.After(p.delay):
return p.provider.Source(ctx)
}
}

func withDelay(provider source.Provider, delay time.Duration) source.Provider {
Expand All @@ -56,6 +61,7 @@ func TestChain(t *testing.T) {

hostname string
queryErr string
timeout time.Duration
}{
{
name: "missing provider in priority list",
Expand All @@ -78,6 +84,18 @@ func TestChain(t *testing.T) {

queryErr: "no source provider was available",
},
{
name: "all providers timeout",
providers: map[string]source.Provider{
"p1": withDelay(HostProvider("p1SourceName"), 100*time.Millisecond),
"p2": withDelay(HostProvider("p2SourceName"), 100*time.Millisecond),
"p3": withDelay(HostProvider("p3SourceName"), 100*time.Millisecond),
},
priorityList: []string{"p1", "p2", "p3"},

queryErr: "no source provider was available",
timeout: 10 * time.Millisecond,
},
{
name: "no providers fail",
providers: map[string]source.Provider{
Expand Down Expand Up @@ -111,11 +129,23 @@ func TestChain(t *testing.T) {

hostname: "p2SourceName",
},
{
name: "p2 times out",
providers: map[string]source.Provider{
"p1": ErrorSourceProvider("p1Err"),
"p2": withDelay(HostProvider("p2SourceName"), 50*time.Millisecond),
"p3": HostProvider("p3SourceName"),
},
priorityList: []string{"p1", "p2", "p3"},

hostname: "p3SourceName",
timeout: 10 * time.Millisecond,
},
}

for _, testInstance := range tests {
t.Run(testInstance.name, func(t *testing.T) {
provider, err := Chain(zaptest.NewLogger(t), testInstance.providers, testInstance.priorityList)
provider, err := Chain(zaptest.NewLogger(t), testInstance.providers, testInstance.priorityList, testInstance.timeout)
if err != nil || testInstance.buildErr != "" {
assert.EqualError(t, err, testInstance.buildErr)
return
Expand Down
3 changes: 3 additions & 0 deletions exporter/datadogexporter/metrics_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func TestNewExporter(t *testing.T) {
CumulativeMonotonicMode: CumulativeMonotonicSumModeToDelta,
},
},
HostMetadata: HostMetadataConfig{
sourceTimeout: 50 * time.Millisecond,
},
}
params := exportertest.NewNopSettings()
f := NewFactory()
Expand Down

0 comments on commit 734a030

Please sign in to comment.