Skip to content

Commit

Permalink
[exporter/datadog] Enable inframetadata.Reporter (#24290)
Browse files Browse the repository at this point in the history
**Description:** 

Enables `inframetadata.Reporter`. This allows sending host metadata
based on resource payloads that have the boolean
`datadog.host.use_as_metadata` set to `true`.

By default, host metadata payloads will not be sent by default.

Depends on #24271
  • Loading branch information
mx-psi authored Jul 19, 2023
1 parent 65e6596 commit e8ffc51
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 53 deletions.
21 changes: 21 additions & 0 deletions .chloggen/mx-psi_reporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: datadogexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add support for reporting host metadata from remote hosts."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24290]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Resource attributes for each telemetry signal will be used for host metadata if the 'datadog.host.use_as_metadata' boolean attribute is set to 'true'.
96 changes: 90 additions & 6 deletions exporter/datadogexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/DataDog/datadog-agent/pkg/trace/agent"
"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confignet"
Expand Down Expand Up @@ -49,13 +50,26 @@ func enableZorkianMetricExport() error {
return featuregate.GlobalRegistry().Set(mertricExportNativeClientFeatureGate.ID(), false)
}

const metadataReporterPeriod = 30 * time.Minute

func consumeResource(metadataReporter *inframetadata.Reporter, res pcommon.Resource, logger *zap.Logger) {
if err := metadataReporter.ConsumeResource(res); err != nil {
logger.Warn("failed to consume resource for host metadata", zap.Error(err), zap.Any("resource", res))
}
}

type factory struct {
onceMetadata sync.Once

onceProvider sync.Once
sourceProvider source.Provider
providerErr error

onceReporter sync.Once
onceStopReporter sync.Once
reporter *inframetadata.Reporter
reporterErr error

wg sync.WaitGroup // waits for agent to exit

registry *featuregate.Registry
Expand All @@ -68,6 +82,34 @@ func (f *factory) SourceProvider(set component.TelemetrySettings, configHostname
return f.sourceProvider, f.providerErr
}

// Reporter builds and returns an *inframetadata.Reporter.
func (f *factory) Reporter(params exporter.CreateSettings, pcfg hostmetadata.PusherConfig) (*inframetadata.Reporter, error) {
f.onceReporter.Do(func() {
pusher := hostmetadata.NewPusher(params, pcfg)
f.reporter, f.reporterErr = inframetadata.NewReporter(params.Logger, pusher, metadataReporterPeriod)
if f.reporterErr == nil {
go func() {
if err := f.reporter.Run(context.Background()); err != nil {
params.Logger.Error("Host metadata reporter failed at runtime", zap.Error(err))
}
}()
}
})
return f.reporter, f.reporterErr
}

// StopReporter stops the host metadata reporter.
func (f *factory) StopReporter() {
// Use onceReporter or wait until it is done
f.onceReporter.Do(func() {})
// Stop the reporter
f.onceStopReporter.Do(func() {
if f.reporterErr == nil && f.reporter != nil {
f.reporter.Stop()
}
})
}

func (f *factory) TraceAgent(ctx context.Context, params exporter.CreateSettings, cfg *Config, sourceProvider source.Provider) (*agent.Agent, error) {
agnt, err := newTraceAgent(ctx, params, cfg, sourceProvider)
if err != nil {
Expand Down Expand Up @@ -187,6 +229,14 @@ func (f *factory) createMetricsExporter(
cancel()
return nil, fmt.Errorf("failed to start trace-agent: %w", err)
}

pcfg := newMetadataConfigfromConfig(cfg)
metadataReporter, err := f.Reporter(set, pcfg)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to build host metadata reporter: %w", err)
}

if cfg.OnlyMetadata {
pushMetricsFn = func(_ context.Context, md pmetric.Metrics) error {
// only sending metadata use only metrics
Expand All @@ -195,13 +245,18 @@ func (f *factory) createMetricsExporter(
if md.ResourceMetrics().Len() > 0 {
attrs = md.ResourceMetrics().At(0).Resource().Attributes()
}
go hostmetadata.RunPusher(ctx, set, newMetadataConfigfromConfig(cfg), hostProvider, attrs)
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs)
})

// Consume resources for host metadata
for i := 0; i < md.ResourceMetrics().Len(); i++ {
res := md.ResourceMetrics().At(i).Resource()
consumeResource(metadataReporter, res, set.Logger)
}
return nil
}
} else {
exp, metricsErr := newMetricsExporter(ctx, set, cfg, &f.onceMetadata, hostProvider, traceagent)
exp, metricsErr := newMetricsExporter(ctx, set, cfg, &f.onceMetadata, hostProvider, traceagent, metadataReporter)
if metricsErr != nil {
cancel() // first cancel context
f.wg.Wait() // then wait for shutdown
Expand All @@ -222,6 +277,7 @@ func (f *factory) createMetricsExporter(
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithShutdown(func(context.Context) error {
cancel()
f.StopReporter()
return nil
}),
)
Expand Down Expand Up @@ -256,6 +312,14 @@ func (f *factory) createTracesExporter(
cancel()
return nil, fmt.Errorf("failed to start trace-agent: %w", err)
}

pcfg := newMetadataConfigfromConfig(cfg)
metadataReporter, err := f.Reporter(set, pcfg)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to build host metadata reporter: %w", err)
}

if cfg.OnlyMetadata {
// only host metadata needs to be sent, once.
pusher = func(_ context.Context, td ptrace.Traces) error {
Expand All @@ -264,16 +328,22 @@ func (f *factory) createTracesExporter(
if td.ResourceSpans().Len() > 0 {
attrs = td.ResourceSpans().At(0).Resource().Attributes()
}
go hostmetadata.RunPusher(ctx, set, newMetadataConfigfromConfig(cfg), hostProvider, attrs)
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs)
})
// Consume resources for host metadata
for i := 0; i < td.ResourceSpans().Len(); i++ {
res := td.ResourceSpans().At(i).Resource()
consumeResource(metadataReporter, res, set.Logger)
}
return nil
}
stop = func(context.Context) error {
cancel()
f.StopReporter()
return nil
}
} else {
tracex, err2 := newTracesExporter(ctx, set, cfg, &f.onceMetadata, hostProvider, traceagent)
tracex, err2 := newTracesExporter(ctx, set, cfg, &f.onceMetadata, hostProvider, traceagent, metadataReporter)
if err2 != nil {
cancel()
f.wg.Wait() // then wait for shutdown
Expand All @@ -282,6 +352,7 @@ func (f *factory) createTracesExporter(
pusher = tracex.consumeTraces
stop = func(context.Context) error {
cancel() // first cancel context
f.StopReporter()
return nil
}
}
Expand Down Expand Up @@ -315,17 +386,29 @@ func (f *factory) createLogsExporter(
}
ctx, cancel := context.WithCancel(ctx)
// cancel() runs on shutdown

pcfg := newMetadataConfigfromConfig(cfg)
metadataReporter, err := f.Reporter(set, pcfg)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to build host metadata reporter: %w", err)
}

if cfg.OnlyMetadata {
// only host metadata needs to be sent, once.
pusher = func(_ context.Context, td plog.Logs) error {
f.onceMetadata.Do(func() {
attrs := pcommon.NewMap()
go hostmetadata.RunPusher(ctx, set, newMetadataConfigfromConfig(cfg), hostProvider, attrs)
go hostmetadata.RunPusher(ctx, set, pcfg, hostProvider, attrs)
})
for i := 0; i < td.ResourceLogs().Len(); i++ {
res := td.ResourceLogs().At(i).Resource()
consumeResource(metadataReporter, res, set.Logger)
}
return nil
}
} else {
exp, err := newLogsExporter(ctx, set, cfg, &f.onceMetadata, hostProvider)
exp, err := newLogsExporter(ctx, set, cfg, &f.onceMetadata, hostProvider, metadataReporter)
if err != nil {
cancel()
f.wg.Wait() // then wait for shutdown
Expand All @@ -344,6 +427,7 @@ func (f *factory) createLogsExporter(
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithShutdown(func(context.Context) error {
cancel()
f.StopReporter()
return nil
}),
)
Expand Down
40 changes: 40 additions & 0 deletions exporter/datadogexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"context"
"encoding/json"
"path/filepath"
"sync"
"testing"

"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata"
"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata/payload"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -19,11 +21,49 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/testutil"
)

var _ inframetadata.Pusher = (*testPusher)(nil)

type testPusher struct {
mu sync.Mutex
payloads []payload.HostMetadata
stopped bool
logger *zap.Logger
t *testing.T
}

func newTestPusher(t *testing.T) *testPusher {
return &testPusher{
logger: zaptest.NewLogger(t),
t: t,
}
}

func (p *testPusher) Push(_ context.Context, hm payload.HostMetadata) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.stopped {
p.logger.Error("Trying to push payload after stopping", zap.Any("payload", hm))
p.t.Fail()
}
p.logger.Info("Storing host metadata payload", zap.Any("payload", hm))
p.payloads = append(p.payloads, hm)
return nil
}

func (p *testPusher) Payloads() []payload.HostMetadata {
p.mu.Lock()
p.stopped = true
defer p.mu.Unlock()
return p.payloads
}

// Test that the factory creates the default configuration
func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
Expand Down
46 changes: 31 additions & 15 deletions exporter/datadogexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

"github.com/DataDog/datadog-api-client-go/v2/api/datadogV2"
"github.com/DataDog/opentelemetry-mapping-go/pkg/inframetadata"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
logsmapping "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/logs"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -22,17 +23,25 @@ import (
)

type logsExporter struct {
params exporter.CreateSettings
cfg *Config
ctx context.Context // ctx triggers shutdown upon cancellation
scrubber scrub.Scrubber // scrubber scrubs sensitive information from error messages
sender *logs.Sender
onceMetadata *sync.Once
sourceProvider source.Provider
params exporter.CreateSettings
cfg *Config
ctx context.Context // ctx triggers shutdown upon cancellation
scrubber scrub.Scrubber // scrubber scrubs sensitive information from error messages
sender *logs.Sender
onceMetadata *sync.Once
sourceProvider source.Provider
metadataReporter *inframetadata.Reporter
}

// newLogsExporter creates a new instance of logsExporter
func newLogsExporter(ctx context.Context, params exporter.CreateSettings, cfg *Config, onceMetadata *sync.Once, sourceProvider source.Provider) (*logsExporter, error) {
func newLogsExporter(
ctx context.Context,
params exporter.CreateSettings,
cfg *Config,
onceMetadata *sync.Once,
sourceProvider source.Provider,
metadataReporter *inframetadata.Reporter,
) (*logsExporter, error) {
// create Datadog client
// validation endpoint is provided by Metrics
errchan := make(chan error)
Expand All @@ -57,13 +66,14 @@ func newLogsExporter(ctx context.Context, params exporter.CreateSettings, cfg *C
s := logs.NewSender(cfg.Logs.TCPAddr.Endpoint, params.Logger, cfg.TimeoutSettings, cfg.LimitedHTTPClientSettings.TLSSetting.InsecureSkipVerify, cfg.Logs.DumpPayloads, string(cfg.API.Key))

return &logsExporter{
params: params,
cfg: cfg,
ctx: ctx,
sender: s,
onceMetadata: onceMetadata,
scrubber: scrub.NewScrubber(),
sourceProvider: sourceProvider,
params: params,
cfg: cfg,
ctx: ctx,
sender: s,
onceMetadata: onceMetadata,
scrubber: scrub.NewScrubber(),
sourceProvider: sourceProvider,
metadataReporter: metadataReporter,
}, nil
}

Expand All @@ -82,6 +92,12 @@ func (exp *logsExporter) consumeLogs(_ context.Context, ld plog.Logs) (err error
}
go hostmetadata.RunPusher(exp.ctx, exp.params, newMetadataConfigfromConfig(exp.cfg), exp.sourceProvider, attrs)
})

// Consume resources for host metadata
for i := 0; i < ld.ResourceLogs().Len(); i++ {
res := ld.ResourceLogs().At(i).Resource()
consumeResource(exp.metadataReporter, res, exp.params.Logger)
}
}

rsl := ld.ResourceLogs()
Expand Down
Loading

0 comments on commit e8ffc51

Please sign in to comment.