Skip to content

Commit

Permalink
[exporterhelper] make enqueue failures available (#8674)
Browse files Browse the repository at this point in the history
These metrics were only exporter either via OC or via the prometheus
exporter. Fixes #8673

---------

Signed-off-by: Alex Boten <aboten@lightstep.com>
Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
  • Loading branch information
Alex Boten and dmitryax authored Oct 20, 2023
1 parent 3d3fffa commit 844b628
Show file tree
Hide file tree
Showing 18 changed files with 174 additions and 140 deletions.
25 changes: 25 additions & 0 deletions .chloggen/codeboten_fix-8673.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: make enqueue failures available for otel metrics

# One or more tracking issues or pull requests related to the change
issues: [8673]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 5 additions & 5 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) {
b.nextSender = nextSender
}

type obsrepSenderFactory func(obsrep *obsExporter) requestSender
type obsrepSenderFactory func(obsrep *ObsReport) requestSender

// baseRequest is a base implementation for the internal.Request.
type baseRequest struct {
Expand Down Expand Up @@ -143,7 +143,7 @@ type baseExporter struct {
signal component.DataType

set exporter.CreateSettings
obsrep *obsExporter
obsrep *ObsReport

// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
Expand All @@ -163,7 +163,7 @@ type baseExporter struct {
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

obsrep, err := newObsExporter(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments)
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
if err != nil {
return nil, err
}
Expand All @@ -175,12 +175,12 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
signal: signal,

queueSender: &baseRequestSender{},
obsrepSender: osf(obsrep),
obsrepSender: osf(obsReport),
retrySender: &baseRequestSender{},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()},

set: set,
obsrep: obsrep,
obsrep: obsReport,
}

for _, op := range options {
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
}
)

func newNoopObsrepSender(_ *obsExporter) requestSender {
func newNoopObsrepSender(_ *ObsReport) requestSender {
return &baseRequestSender{}
}

Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewLogsExporter(
req := newLogsRequest(ctx, ld, pusher)
serr := be.send(req)
if errors.Is(serr, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count()))
be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeLogs, int64(req.Count()))
}
return serr
}, be.consumerOptions...)
Expand Down Expand Up @@ -151,7 +151,7 @@ func NewLogsRequestExporter(
r := newRequest(ctx, req)
sErr := be.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count()))
be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeLogs, int64(r.Count()))
}
return sErr
}, be.consumerOptions...)
Expand All @@ -164,10 +164,10 @@ func NewLogsRequestExporter(

type logsExporterWithObservability struct {
baseRequestSender
obsrep *obsExporter
obsrep *ObsReport
}

func newLogsExporterWithObservability(obsrep *obsExporter) requestSender {
func newLogsExporterWithObservability(obsrep *ObsReport) requestSender {
return &logsExporterWithObservability{obsrep: obsrep}
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
}

// 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow
checkExporterEnqueueFailedLogsStats(t, globalInstruments, fakeLogsExporterName, int64(15))
require.NoError(t, tt.CheckExporterEnqueueFailedLogs(int64(15)))
}

func TestLogsExporter_WithSpan(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewMetricsExporter(
req := newMetricsRequest(ctx, md, pusher)
serr := be.send(req)
if errors.Is(serr, errSendingQueueIsFull) {
be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count()))
be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeMetrics, int64(req.Count()))
}
return serr
}, be.consumerOptions...)
Expand Down Expand Up @@ -151,7 +151,7 @@ func NewMetricsRequestExporter(
r := newRequest(ctx, req)
sErr := be.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count()))
be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeMetrics, int64(r.Count()))
}
return sErr
}, be.consumerOptions...)
Expand All @@ -164,10 +164,10 @@ func NewMetricsRequestExporter(

type metricsSenderWithObservability struct {
baseRequestSender
obsrep *obsExporter
obsrep *ObsReport
}

func newMetricsSenderWithObservability(obsrep *obsExporter) requestSender {
func newMetricsSenderWithObservability(obsrep *ObsReport) requestSender {
return &metricsSenderWithObservability{obsrep: obsrep}
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
}

// 2 batched must be in queue, and 10 metric points rejected due to queue overflow
checkExporterEnqueueFailedMetricsStats(t, globalInstruments, fakeMetricsExporterName, int64(10))
require.NoError(t, tt.CheckExporterEnqueueFailedMetrics(int64(10)))
}

func TestMetricsExporter_WithSpan(t *testing.T) {
Expand Down
77 changes: 69 additions & 8 deletions exporter/exporterhelper/obsexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ type ObsReport struct {
tracer trace.Tracer
logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue
sentSpans metric.Int64Counter
failedToSendSpans metric.Int64Counter
sentMetricPoints metric.Int64Counter
failedToSendMetricPoints metric.Int64Counter
sentLogRecords metric.Int64Counter
failedToSendLogRecords metric.Int64Counter
useOtelForMetrics bool
otelAttrs []attribute.KeyValue
sentSpans metric.Int64Counter
failedToSendSpans metric.Int64Counter
failedToEnqueueSpans metric.Int64Counter
sentMetricPoints metric.Int64Counter
failedToSendMetricPoints metric.Int64Counter
failedToEnqueueMetricPoints metric.Int64Counter
sentLogRecords metric.Int64Counter
failedToSendLogRecords metric.Int64Counter
failedToEnqueueLogRecords metric.Int64Counter
}

// ObsReportSettings are settings for creating an ObsReport.
Expand Down Expand Up @@ -96,6 +99,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.failedToEnqueueSpans, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueSpansKey,
metric.WithDescription("Number of spans failed to be added to the sending queue."),
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.sentMetricPoints, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentMetricPointsKey,
metric.WithDescription("Number of metric points successfully sent to destination."),
Expand All @@ -108,6 +117,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.failedToEnqueueMetricPoints, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueMetricPointsKey,
metric.WithDescription("Number of metric points failed to be added to the sending queue."),
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.sentLogRecords, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentLogRecordsKey,
metric.WithDescription("Number of log record successfully sent to destination."),
Expand All @@ -120,6 +135,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.failedToEnqueueLogRecords, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueLogRecordsKey,
metric.WithDescription("Number of log records failed to be added to the sending queue."),
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

return errors
}

Expand Down Expand Up @@ -252,3 +273,43 @@ func toNumItems(numExportedItems int, err error) (int64, int64) {
}
return int64(numExportedItems), 0
}

func (or *ObsReport) recordEnqueueFailure(ctx context.Context, dataType component.DataType, failed int64) {
if or.useOtelForMetrics {
or.recordEnqueueFailureWithOtel(ctx, dataType, failed)
} else {
or.recordEnqueueFailureWithOC(ctx, dataType, failed)
}
}

func (or *ObsReport) recordEnqueueFailureWithOC(ctx context.Context, dataType component.DataType, failed int64) {
var failedMeasure *stats.Int64Measure
switch dataType {
case component.DataTypeTraces:
failedMeasure = obsmetrics.ExporterFailedToEnqueueSpans
case component.DataTypeMetrics:
failedMeasure = obsmetrics.ExporterFailedToEnqueueMetricPoints
case component.DataTypeLogs:
failedMeasure = obsmetrics.ExporterFailedToEnqueueLogRecords
}
if failed > 0 {
_ = stats.RecordWithTags(
ctx,
or.mutators,
failedMeasure.M(failed))
}
}

func (or *ObsReport) recordEnqueueFailureWithOtel(ctx context.Context, dataType component.DataType, failed int64) {
var enqueueFailedMeasure metric.Int64Counter
switch dataType {
case component.DataTypeTraces:
enqueueFailedMeasure = or.failedToEnqueueSpans
case component.DataTypeMetrics:
enqueueFailedMeasure = or.failedToEnqueueMetricPoints
case component.DataTypeLogs:
enqueueFailedMeasure = or.failedToEnqueueLogRecords
}

enqueueFailedMeasure.Add(ctx, failed, metric.WithAttributes(or.otelAttrs...))
}
73 changes: 3 additions & 70 deletions exporter/exporterhelper/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"

import (
"context"

"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
Expand All @@ -26,12 +24,9 @@ func init() {
}

type instruments struct {
registry *metric.Registry
queueSize *metric.Int64DerivedGauge
queueCapacity *metric.Int64DerivedGauge
failedToEnqueueTraceSpans *metric.Int64Cumulative
failedToEnqueueMetricPoints *metric.Int64Cumulative
failedToEnqueueLogRecords *metric.Int64Cumulative
registry *metric.Registry
queueSize *metric.Int64DerivedGauge
queueCapacity *metric.Int64DerivedGauge
}

func newInstruments(registry *metric.Registry) *instruments {
Expand All @@ -49,67 +44,5 @@ func newInstruments(registry *metric.Registry) *instruments {
metric.WithDescription("Fixed capacity of the retry queue (in batches)"),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueTraceSpans, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_spans",
metric.WithDescription("Number of spans failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueMetricPoints, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_metric_points",
metric.WithDescription("Number of metric points failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueLogRecords, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_log_records",
metric.WithDescription("Number of log records failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

return insts
}

// obsExporter is a helper to add observability to an exporter.
type obsExporter struct {
*ObsReport
failedToEnqueueTraceSpansEntry *metric.Int64CumulativeEntry
failedToEnqueueMetricPointsEntry *metric.Int64CumulativeEntry
failedToEnqueueLogRecordsEntry *metric.Int64CumulativeEntry
}

// newObsExporter creates a new observability exporter.
func newObsExporter(cfg ObsReportSettings, insts *instruments) (*obsExporter, error) {
labelValue := metricdata.NewLabelValue(cfg.ExporterID.String())
failedToEnqueueTraceSpansEntry, _ := insts.failedToEnqueueTraceSpans.GetEntry(labelValue)
failedToEnqueueMetricPointsEntry, _ := insts.failedToEnqueueMetricPoints.GetEntry(labelValue)
failedToEnqueueLogRecordsEntry, _ := insts.failedToEnqueueLogRecords.GetEntry(labelValue)

exp, err := NewObsReport(cfg)
if err != nil {
return nil, err
}

return &obsExporter{
ObsReport: exp,
failedToEnqueueTraceSpansEntry: failedToEnqueueTraceSpansEntry,
failedToEnqueueMetricPointsEntry: failedToEnqueueMetricPointsEntry,
failedToEnqueueLogRecordsEntry: failedToEnqueueLogRecordsEntry,
}, nil
}

// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
func (eor *obsExporter) recordTracesEnqueueFailure(_ context.Context, numSpans int64) {
eor.failedToEnqueueTraceSpansEntry.Inc(numSpans)
}

// recordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
func (eor *obsExporter) recordMetricsEnqueueFailure(_ context.Context, numMetricPoints int64) {
eor.failedToEnqueueMetricPointsEntry.Inc(numMetricPoints)
}

// recordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
func (eor *obsExporter) recordLogsEnqueueFailure(_ context.Context, numLogRecords int64) {
eor.failedToEnqueueLogRecordsEntry.Inc(numLogRecords)
}
Loading

0 comments on commit 844b628

Please sign in to comment.