From 289c1203b1b612ef053a4965c2ca067b3d201a13 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Thu, 18 Jul 2024 17:07:05 +0200 Subject: [PATCH] [processor/deltatocumulative]: mdatagen telemetry (#33981) Uses recently introduced `telemetry:` feature of `mdatagen` to define the internal telemetry exported from this component. Leaves the general `telemetry` package in-place, as it also does error handling (for now). Changing so requires a deeper refactor that will be done later Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33573 **Documentation:** changed to auto-generated --- .../deltatocumulativeprocessor/README.md | 14 +-- .../documentation.md | 63 ++++++++++++ .../deltatocumulativeprocessor/factory.go | 8 +- .../generated_component_telemetry_test.go | 76 ++++++++++++++ processor/deltatocumulativeprocessor/go.mod | 5 +- processor/deltatocumulativeprocessor/go.sum | 2 - .../internal/metadata/generated_telemetry.go | 89 ++++++++++++++++- .../metadata/generated_telemetry_test.go | 13 +++ .../internal/telemetry/faults_test.go | 7 +- .../internal/telemetry/metrics.go | 98 ++++--------------- .../deltatocumulativeprocessor/metadata.yaml | 53 ++++++++++ .../deltatocumulativeprocessor/processor.go | 7 +- 12 files changed, 330 insertions(+), 105 deletions(-) create mode 100644 processor/deltatocumulativeprocessor/documentation.md create mode 100644 processor/deltatocumulativeprocessor/generated_component_telemetry_test.go diff --git a/processor/deltatocumulativeprocessor/README.md b/processor/deltatocumulativeprocessor/README.md index ffceda1c3d3b..425a8dd97776 100644 --- a/processor/deltatocumulativeprocessor/README.md +++ b/processor/deltatocumulativeprocessor/README.md @@ -36,15 +36,5 @@ There is no further configuration required. All delta samples are converted to c ## Troubleshooting -The following metrics are recorded when [telemetry is -enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry): - -| Name | Description | Unit | -|------------------------------------------|---------------------------------------------------------------------------------------|---------------| -| `deltatocumulative.streams.tracked` | Number of streams currently tracked by the aggregation state | `{stream}` | -| `deltatocumulative.streams.limit` | Upper limit of tracked streams | `{stream}` | -| `deltatocumulative.streams.evicted` | Number of streams removed from tracking to ingest newer streams | `{stream}` | -| `deltatocumulative.streams.max_stale` | Duration without new samples after which streams are dropped | `second` | -| `deltatocumulative.datapoints.processed` | Total number of datapoints processed, whether successful or not | `{datapoint}` | -| `deltatocumulative.datapoints.dropped` | Faulty datapoints that were dropped due to the reason given in the `reason` attribute | `{datapoint}` | -| `deltatocumulative.gaps.length` | Total length of all gaps in the streams, which occur e.g. due to lost in transit | `second` | +When [Telemetry is +enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry), this component exports [several metrics](./documentation.md). diff --git a/processor/deltatocumulativeprocessor/documentation.md b/processor/deltatocumulativeprocessor/documentation.md new file mode 100644 index 000000000000..55d85f06c764 --- /dev/null +++ b/processor/deltatocumulativeprocessor/documentation.md @@ -0,0 +1,63 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# deltatocumulative + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### otelcol_deltatocumulative.datapoints.dropped + +number of datapoints dropped due to given 'reason' + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoint} | Sum | Int | true | + +### otelcol_deltatocumulative.datapoints.processed + +number of datapoints processed + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoint} | Sum | Int | true | + +### otelcol_deltatocumulative.gaps.length + +total duration where data was expected but not received + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| s | Sum | Int | true | + +### otelcol_deltatocumulative.streams.evicted + +number of streams evicted + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {stream} | Sum | Int | true | + +### otelcol_deltatocumulative.streams.limit + +upper limit of tracked streams + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {stream} | Gauge | Int | + +### otelcol_deltatocumulative.streams.max_stale + +duration after which streams inactive streams are dropped + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| s | Gauge | Int | + +### otelcol_deltatocumulative.streams.tracked + +number of streams tracked + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {dps} | Sum | Int | false | diff --git a/processor/deltatocumulativeprocessor/factory.go b/processor/deltatocumulativeprocessor/factory.go index 06ef84acd8f5..8a6a394083d6 100644 --- a/processor/deltatocumulativeprocessor/factory.go +++ b/processor/deltatocumulativeprocessor/factory.go @@ -28,6 +28,10 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo return nil, fmt.Errorf("configuration parsing error") } - meter := metadata.Meter(set.TelemetrySettings) - return newProcessor(pcfg, set.Logger, meter, next), nil + telb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + + return newProcessor(pcfg, set.Logger, telb, next), nil } diff --git a/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go new file mode 100644 index 000000000000..52ad9e905c16 --- /dev/null +++ b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go @@ -0,0 +1,76 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package deltatocumulativeprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" +) + +type componentTestTelemetry struct { + reader *sdkmetric.ManualReader + meterProvider *sdkmetric.MeterProvider +} + +func (tt *componentTestTelemetry) NewSettings() processor.Settings { + settings := processortest.NewNopSettings() + settings.MeterProvider = tt.meterProvider + settings.ID = component.NewID(component.MustNewType("deltatocumulative")) + + return settings +} + +func setupTestTelemetry() componentTestTelemetry { + reader := sdkmetric.NewManualReader() + return componentTestTelemetry{ + reader: reader, + meterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)), + } +} + +func (tt *componentTestTelemetry) assertMetrics(t *testing.T, expected []metricdata.Metrics) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := tt.getMetric(want.Name, md) + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), tt.len(md)) +} + +func (tt *componentTestTelemetry) getMetric(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func (tt *componentTestTelemetry) len(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} + +func (tt *componentTestTelemetry) Shutdown(ctx context.Context) error { + return tt.meterProvider.Shutdown(ctx) +} diff --git a/processor/deltatocumulativeprocessor/go.mod b/processor/deltatocumulativeprocessor/go.mod index 07e0da39a7d7..54dadcfab84c 100644 --- a/processor/deltatocumulativeprocessor/go.mod +++ b/processor/deltatocumulativeprocessor/go.mod @@ -7,12 +7,14 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.105.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.105.1-0.20240717163034-43ed6184f9fe + go.opentelemetry.io/collector/config/configtelemetry v0.105.1-0.20240717163034-43ed6184f9fe go.opentelemetry.io/collector/confmap v0.105.1-0.20240717163034-43ed6184f9fe go.opentelemetry.io/collector/consumer v0.105.1-0.20240717163034-43ed6184f9fe go.opentelemetry.io/collector/pdata v1.12.1-0.20240716231837-5753a58f712b go.opentelemetry.io/collector/processor v0.105.1-0.20240717163034-43ed6184f9fe go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/metric v1.28.0 + go.opentelemetry.io/otel/sdk/metric v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 @@ -43,15 +45,12 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect - go.opentelemetry.io/collector v0.105.1-0.20240717163034-43ed6184f9fe // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.105.1-0.20240717163034-43ed6184f9fe // indirect go.opentelemetry.io/collector/featuregate v1.12.1-0.20240716231837-5753a58f712b // indirect go.opentelemetry.io/collector/internal/globalgates v0.105.1-0.20240717163034-43ed6184f9fe // indirect go.opentelemetry.io/collector/pdata/pprofile v0.105.1-0.20240717163034-43ed6184f9fe // indirect go.opentelemetry.io/collector/pdata/testdata v0.105.1-0.20240717163034-43ed6184f9fe // indirect go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect go.opentelemetry.io/otel/sdk v1.28.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/sys v0.21.0 // indirect diff --git a/processor/deltatocumulativeprocessor/go.sum b/processor/deltatocumulativeprocessor/go.sum index 4fda14528f5f..14924f080ac2 100644 --- a/processor/deltatocumulativeprocessor/go.sum +++ b/processor/deltatocumulativeprocessor/go.sum @@ -64,8 +64,6 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/collector v0.105.1-0.20240717163034-43ed6184f9fe h1:ZjgqZsb2G6DekoePCEUmr1Mh3dbiSo5fTODlsCwyhPg= -go.opentelemetry.io/collector v0.105.1-0.20240717163034-43ed6184f9fe/go.mod h1:UVapTqB4fJeZpGU/YgOo6665cxCSytqYmMkVmRlu2cg= go.opentelemetry.io/collector/component v0.105.1-0.20240717163034-43ed6184f9fe h1:arQYi6m+phRUEW9nFRYcDv05oSNtrl3C6CrGej+5AsY= go.opentelemetry.io/collector/component v0.105.1-0.20240717163034-43ed6184f9fe/go.mod h1:s8KoxOrhNIBzetkb0LHmzX1OI67DyZbaaUPOWIXS1mg= go.opentelemetry.io/collector/config/configtelemetry v0.105.1-0.20240717163034-43ed6184f9fe h1:PZBUpzjgEarO4SPyO2ZZFJqe9U6gcXCDB1hC8RdKwpE= diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go index 47e60276e3a3..1ee7497b7bfa 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go @@ -3,9 +3,14 @@ package metadata import ( - "go.opentelemetry.io/collector/component" + "errors" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" ) func Meter(settings component.TelemetrySettings) metric.Meter { @@ -15,3 +20,85 @@ func Meter(settings component.TelemetrySettings) metric.Meter { func Tracer(settings component.TelemetrySettings) trace.Tracer { return settings.TracerProvider.Tracer("otelcol/deltatocumulative") } + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + DeltatocumulativeDatapointsDropped metric.Int64Counter + DeltatocumulativeDatapointsProcessed metric.Int64Counter + DeltatocumulativeGapsLength metric.Int64Counter + DeltatocumulativeStreamsEvicted metric.Int64Counter + DeltatocumulativeStreamsLimit metric.Int64Gauge + DeltatocumulativeStreamsMaxStale metric.Int64Gauge + DeltatocumulativeStreamsTracked metric.Int64UpDownCounter + level configtelemetry.Level +} + +// telemetryBuilderOption applies changes to default builder. +type telemetryBuilderOption func(*TelemetryBuilder) + +// WithLevel sets the current telemetry level for the component. +func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.level = lvl + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{level: configtelemetry.LevelBasic} + for _, op := range options { + op(&builder) + } + var err, errs error + if builder.level >= configtelemetry.LevelBasic { + builder.meter = Meter(settings) + } else { + builder.meter = noop.Meter{} + } + builder.DeltatocumulativeDatapointsDropped, err = builder.meter.Int64Counter( + "otelcol_deltatocumulative.datapoints.dropped", + metric.WithDescription("number of datapoints dropped due to given 'reason'"), + metric.WithUnit("{datapoint}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeDatapointsProcessed, err = builder.meter.Int64Counter( + "otelcol_deltatocumulative.datapoints.processed", + metric.WithDescription("number of datapoints processed"), + metric.WithUnit("{datapoint}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeGapsLength, err = builder.meter.Int64Counter( + "otelcol_deltatocumulative.gaps.length", + metric.WithDescription("total duration where data was expected but not received"), + metric.WithUnit("s"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsEvicted, err = builder.meter.Int64Counter( + "otelcol_deltatocumulative.streams.evicted", + metric.WithDescription("number of streams evicted"), + metric.WithUnit("{stream}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsLimit, err = builder.meter.Int64Gauge( + "otelcol_deltatocumulative.streams.limit", + metric.WithDescription("upper limit of tracked streams"), + metric.WithUnit("{stream}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsMaxStale, err = builder.meter.Int64Gauge( + "otelcol_deltatocumulative.streams.max_stale", + metric.WithDescription("duration after which streams inactive streams are dropped"), + metric.WithUnit("s"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsTracked, err = builder.meter.Int64UpDownCounter( + "otelcol_deltatocumulative.streams.tracked", + metric.WithDescription("number of streams tracked"), + metric.WithUnit("{dps}"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go index 492fa62a3832..a4ff6092b731 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go index 5d9205bffd05..b16d0e4183ef 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go @@ -7,12 +7,14 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/otel/metric/noop" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" @@ -112,10 +114,13 @@ func TestFaults(t *testing.T) { }, } + telb, err := metadata.NewTelemetryBuilder(component.TelemetrySettings{MeterProvider: noop.NewMeterProvider()}) + require.NoError(t, err) + for _, c := range cases { t.Run(c.Name, func(t *testing.T) { id, dp := sum.Stream() - tel := telemetry.New(noop.Meter{}) + tel := telemetry.New(telb) dps := c.Map if dps == nil { diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index 4a23dd6fde3e..cbf52c09ff94 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -8,7 +8,6 @@ import ( "errors" "time" - "go.opentelemetry.io/collector/processor/processorhelper" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -19,22 +18,29 @@ import ( type Telemetry struct { Metrics - - meter metric.Meter } -func New(meter metric.Meter) Telemetry { - return Telemetry{ - Metrics: metrics(meter), - meter: meter, - } +func New(telb *metadata.TelemetryBuilder) Telemetry { + return Telemetry{Metrics: Metrics{ + streams: Streams{ + tracked: telb.DeltatocumulativeStreamsTracked, + limit: telb.DeltatocumulativeStreamsLimit, + evicted: telb.DeltatocumulativeStreamsEvicted, + stale: telb.DeltatocumulativeStreamsMaxStale, + }, + dps: Datapoints{ + total: telb.DeltatocumulativeDatapointsProcessed, + dropped: telb.DeltatocumulativeDatapointsDropped, + }, + gaps: telb.DeltatocumulativeGapsLength, + }} } type Streams struct { tracked metric.Int64UpDownCounter - limit metric.Int64ObservableGauge + limit metric.Int64Gauge evicted metric.Int64Counter - stale metric.Int64ObservableGauge + stale metric.Int64Gauge } type Datapoints struct { @@ -49,69 +55,12 @@ type Metrics struct { gaps metric.Int64Counter } -func metrics(meter metric.Meter) Metrics { - var ( - count = use(meter.Int64Counter) - updown = use(meter.Int64UpDownCounter) - gauge = use(meter.Int64ObservableGauge) - ) - - return Metrics{ - streams: Streams{ - tracked: updown("streams.tracked", - metric.WithDescription("number of streams tracked"), - metric.WithUnit("{stream}"), - ), - limit: gauge("streams.limit", - metric.WithDescription("upper limit of tracked streams"), - metric.WithUnit("{stream}"), - ), - evicted: count("streams.evicted", - metric.WithDescription("number of streams evicted"), - metric.WithUnit("{stream}"), - ), - stale: gauge("streams.max_stale", - metric.WithDescription("duration without new samples after which streams are dropped"), - metric.WithUnit("s"), - ), - }, - dps: Datapoints{ - total: count("datapoints.processed", - metric.WithDescription("number of datapoints processed"), - metric.WithUnit("{datapoint}"), - ), - dropped: count("datapoints.dropped", - metric.WithDescription("number of dropped datapoints due to given 'reason'"), - metric.WithUnit("{datapoint}"), - ), - }, - gaps: count("gaps.length", - metric.WithDescription("total duration where data was expected but not received"), - metric.WithUnit("s"), - ), - } -} - func (tel Telemetry) WithLimit(max int64) { - then := metric.Callback(func(_ context.Context, o metric.Observer) error { - o.ObserveInt64(tel.streams.limit, max) - return nil - }) - _, err := tel.meter.RegisterCallback(then, tel.streams.limit) - if err != nil { - panic(err) - } + tel.streams.limit.Record(context.Background(), max) } func (tel Telemetry) WithStale(max time.Duration) { - then := metric.Callback(func(_ context.Context, o metric.Observer) error { - o.ObserveInt64(tel.streams.stale, int64(max.Seconds())) - return nil - }) - _, err := tel.meter.RegisterCallback(then, tel.streams.stale) - if err != nil { - panic(err) - } + tel.streams.stale.Record(context.Background(), int64(max.Seconds())) } func ObserveItems[T any](items streams.Map[T], metrics *Metrics) Items[T] { @@ -208,14 +157,3 @@ func dec[A addable[O], O any](a A, opts ...O) { func reason(reason string) metric.AddOption { return metric.WithAttributes(attribute.String("reason", reason)) } - -func use[F func(string, ...O) (M, error), M any, O any](f F) func(string, ...O) M { - return func(name string, opts ...O) M { - name = processorhelper.BuildCustomMetricName(metadata.Type.String(), name) - m, err := f(name, opts...) - if err != nil { - panic(err) - } - return m - } -} diff --git a/processor/deltatocumulativeprocessor/metadata.yaml b/processor/deltatocumulativeprocessor/metadata.yaml index 43e5ba4f3877..1966284cb1eb 100644 --- a/processor/deltatocumulativeprocessor/metadata.yaml +++ b/processor/deltatocumulativeprocessor/metadata.yaml @@ -9,3 +9,56 @@ status: warnings: [Statefulness] codeowners: active: [sh0rez, RichieSams, jpkrohling] + + +telemetry: + metrics: + # streams + deltatocumulative.streams.tracked: + description: number of streams tracked + unit: "{dps}" + sum: + value_type: int + monotonic: false + enabled: true + deltatocumulative.streams.limit: + description: upper limit of tracked streams + unit: "{stream}" + gauge: + value_type: int + enabled: true + deltatocumulative.streams.evicted: + description: number of streams evicted + unit: "{stream}" + sum: + value_type: int + monotonic: true + enabled: true + deltatocumulative.streams.max_stale: + description: duration after which streams inactive streams are dropped + unit: "s" + gauge: + value_type: int + enabled: true + # datapoints + deltatocumulative.datapoints.processed: + description: number of datapoints processed + unit: "{datapoint}" + sum: + value_type: int + monotonic: true + enabled: true + deltatocumulative.datapoints.dropped: + description: number of datapoints dropped due to given 'reason' + unit: "{datapoint}" + sum: + value_type: int + monotonic: true + enabled: true + deltatocumulative.gaps.length: + description: total duration where data was expected but not received + unit: "s" + sum: + value_type: int + monotonic: true + enabled: true diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index bd904c70857e..63202186fb59 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -13,13 +13,13 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" @@ -40,11 +40,10 @@ type Processor struct { mtx sync.Mutex } -func newProcessor(cfg *Config, log *zap.Logger, meter metric.Meter, next consumer.Metrics) *Processor { +func newProcessor(cfg *Config, log *zap.Logger, telb *metadata.TelemetryBuilder, next consumer.Metrics) *Processor { ctx, cancel := context.WithCancel(context.Background()) - tel := telemetry.New(meter) - + tel := telemetry.New(telb) proc := Processor{ log: log, ctx: ctx,