Skip to content

Commit

Permalink
Resolve #33464 [processor/groupbytrace] migrate from opencensus library
Browse files Browse the repository at this point in the history
This migrates internal metrics from opencensus to mdatagen + otel. Used metadata.Telemetrybuilder and replaced all metrices from metrics file to metadata.yaml file. Updated tests to move from opencensus. Documentation generated by metadata.yaml file
  • Loading branch information
honeychaudharyc authored and codeboten committed Jul 4, 2024
1 parent 475960c commit abf09cf
Show file tree
Hide file tree
Showing 18 changed files with 448 additions and 286 deletions.
71 changes: 71 additions & 0 deletions processor/groupbytraceprocessor/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
[comment]: <> (Code generated by mdatagen. DO NOT EDIT.)

# groupbytrace

## Internal Telemetry

The following telemetry is emitted by this component.

### processor_groupbytrace_conf_num_traces

Maximum number of traces to hold in the internal storage

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |

### processor_groupbytrace_event_latency

How long the queue events are taking to be processed

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| ms | Histogram | Int |

### processor_groupbytrace_incomplete_releases

Releases that are suspected to have been incomplete

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| <nil> | Sum | Int | true |

### processor_groupbytrace_num_events_in_queue

Number of events currently in the queue

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |

### processor_groupbytrace_num_traces_in_memory

Number of traces currently in the in-memory storage

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |

### processor_groupbytrace_spans_released

Spans released to the next consumer

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_groupbytrace_traces_evicted

Traces evicted from the internal buffer

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_groupbytrace_traces_released

Traces released to the next consumer

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |
19 changes: 9 additions & 10 deletions processor/groupbytraceprocessor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (
"sync"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -44,8 +45,6 @@ var (
return &hash
},
}

eventTagKey = tag.MustNewKey("event")
)

type eventType int
Expand All @@ -70,8 +69,8 @@ type eventMachine struct {
metricsCollectionInterval time.Duration
shutdownTimeout time.Duration

logger *zap.Logger

logger *zap.Logger
telemetry *metadata.TelemetryBuilder
onTraceReceived func(td tracesWithID, worker *eventMachineWorker) error
onTraceExpired func(traceID pcommon.TraceID, worker *eventMachineWorker) error
onTraceReleased func(rss []ptrace.ResourceSpans) error
Expand All @@ -84,9 +83,10 @@ type eventMachine struct {
closed bool
}

func newEventMachine(logger *zap.Logger, bufferSize int, numWorkers int, numTraces int) *eventMachine {
func newEventMachine(logger *zap.Logger, bufferSize int, numWorkers int, numTraces int, telemetry *metadata.TelemetryBuilder) *eventMachine {
em := &eventMachine{
logger: logger,
telemetry: telemetry,
workers: make([]*eventMachineWorker, numWorkers),
close: make(chan struct{}),
shutdownLock: &sync.RWMutex{},
Expand Down Expand Up @@ -119,7 +119,7 @@ func (em *eventMachine) numEvents() int {
func (em *eventMachine) periodicMetrics() {
numEvents := em.numEvents()
em.logger.Debug("recording current state of the queue", zap.Int("num-events", numEvents))
stats.Record(context.Background(), mNumEventsInQueue.M(int64(numEvents)))
em.telemetry.ProcessorGroupbytraceNumEventsInQueue.Record(context.Background(), int64(numEvents))

em.shutdownLock.RLock()
closed := em.closed
Expand Down Expand Up @@ -288,8 +288,7 @@ func (em *eventMachine) handleEventWithObservability(event string, do func() err
start := time.Now()
succeeded, err := doWithTimeout(time.Second, do)
duration := time.Since(start)

_ = stats.RecordWithTags(context.Background(), []tag.Mutator{tag.Upsert(eventTagKey, event)}, mEventLatency.M(duration.Milliseconds()))
em.telemetry.ProcessorGroupbytraceEventLatency.Record(context.Background(), duration.Milliseconds(), metric.WithAttributeSet(attribute.NewSet(attribute.String("event", event))))

if err != nil {
em.logger.Error("failed to process event", zap.Error(err), zap.String("event", event))
Expand Down
89 changes: 24 additions & 65 deletions processor/groupbytraceprocessor/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,21 @@ import (
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"
"go.uber.org/zap"
)

func TestEventCallback(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)

for _, tt := range []struct {
casename string
typ eventType
Expand Down Expand Up @@ -80,7 +85,7 @@ func TestEventCallback(t *testing.T) {
require.NoError(t, err)

wg := &sync.WaitGroup{}
em := newEventMachine(logger, 50, 1, 1_000)
em := newEventMachine(logger, 50, 1, 1_000, tel)
tt.registerCallback(em, wg)

em.startInBackground()
Expand All @@ -100,6 +105,8 @@ func TestEventCallback(t *testing.T) {
}

func TestEventCallbackNotSet(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
for _, tt := range []struct {
casename string
typ eventType
Expand Down Expand Up @@ -127,7 +134,7 @@ func TestEventCallbackNotSet(t *testing.T) {
require.NoError(t, err)

wg := &sync.WaitGroup{}
em := newEventMachine(logger, 50, 1, 1_000)
em := newEventMachine(logger, 50, 1, 1_000, tel)
em.onError = func(_ event) {
wg.Done()
}
Expand All @@ -147,6 +154,8 @@ func TestEventCallbackNotSet(t *testing.T) {
}

func TestEventInvalidPayload(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
for _, tt := range []struct {
casename string
typ eventType
Expand Down Expand Up @@ -195,7 +204,7 @@ func TestEventInvalidPayload(t *testing.T) {
require.NoError(t, err)

wg := &sync.WaitGroup{}
em := newEventMachine(logger, 50, 1, 1_000)
em := newEventMachine(logger, 50, 1, 1_000, tel)
em.onError = func(_ event) {
wg.Done()
}
Expand All @@ -216,12 +225,14 @@ func TestEventInvalidPayload(t *testing.T) {
}

func TestEventUnknownType(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
// prepare
logger, err := zap.NewDevelopment()
require.NoError(t, err)

wg := &sync.WaitGroup{}
em := newEventMachine(logger, 50, 1, 1_000)
em := newEventMachine(logger, 50, 1, 1_000, tel)
em.onError = func(_ event) {
wg.Done()
}
Expand All @@ -239,6 +250,8 @@ func TestEventUnknownType(t *testing.T) {
}

func TestEventTracePerWorker(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
for _, tt := range []struct {
casename string
traceID [16]byte
Expand All @@ -265,7 +278,7 @@ func TestEventTracePerWorker(t *testing.T) {
},
} {
t.Run(tt.casename, func(t *testing.T) {
em := newEventMachine(zap.NewNop(), 200, 100, 1_000)
em := newEventMachine(zap.NewNop(), 200, 100, 1_000, tel)

var wg sync.WaitGroup
var workerForTrace *eventMachineWorker
Expand Down Expand Up @@ -342,13 +355,15 @@ func TestEventConsumeConsistency(t *testing.T) {
}

func TestEventShutdown(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
// prepare
wg := sync.WaitGroup{}
wg.Add(1)

traceReceivedFired := &atomic.Int64{}
traceExpiredFired := &atomic.Int64{}
em := newEventMachine(zap.NewNop(), 50, 1, 1_000)
em := newEventMachine(zap.NewNop(), 50, 1, 1_000, tel)
em.onTraceReceived = func(tracesWithID, *eventMachineWorker) error {
traceReceivedFired.Store(1)
return nil
Expand Down Expand Up @@ -411,67 +426,11 @@ func TestEventShutdown(t *testing.T) {
shutdownWg.Wait()
}

func TestPeriodicMetrics(t *testing.T) {
// prepare
views := metricViews()

// ensure that we are starting with a clean state
view.Unregister(views...)
assert.NoError(t, view.Register(views...))

// try to be nice with the next consumer (test)
defer view.Unregister(views...)

em := newEventMachine(zap.NewNop(), 50, 1, 1_000)
em.metricsCollectionInterval = time.Millisecond

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
expected := 2
calls := 0
for range em.workers[0].events {
// we expect two events, after which we just exit the loop
// if we return from here, we'd still have one item in the queue that is not going to be consumed
wg.Wait()
calls++

if calls == expected {
return
}
}
}()

// sanity check
assertGaugeNotCreated(t, mNumEventsInQueue)

// test
em.workers[0].fire(event{typ: traceReceived})
em.workers[0].fire(event{typ: traceReceived}) // the first is consumed right away, the second is in the queue
go em.periodicMetrics()

// ensure our gauge is showing 1 item in the queue
assert.Eventually(t, func() bool {
return getGaugeValue(t, mNumEventsInQueue) == 1
}, 1*time.Second, 10*time.Millisecond)

wg.Done() // release all events

// ensure our gauge is now showing no items in the queue
assert.Eventually(t, func() bool {
return getGaugeValue(t, mNumEventsInQueue) == 0
}, 1*time.Second, 10*time.Millisecond)

// signal and wait for the recursive call to finish
em.shutdownLock.Lock()
em.closed = true
em.shutdownLock.Unlock()
time.Sleep(5 * time.Millisecond)
}

func TestForceShutdown(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
// prepare
em := newEventMachine(zap.NewNop(), 50, 1, 1_000)
em := newEventMachine(zap.NewNop(), 50, 1, 1_000, tel)
em.shutdownTimeout = 20 * time.Millisecond

// test
Expand Down
10 changes: 4 additions & 6 deletions processor/groupbytraceprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"time"

"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
Expand All @@ -31,8 +30,6 @@ var (

// NewFactory returns a new factory for the Filter processor.
func NewFactory() processor.Factory {
// TODO: find a more appropriate way to get this done, as we are swallowing the error here
_ = view.Register(metricViews()...)

return processor.NewFactory(
metadata.Type,
Expand Down Expand Up @@ -70,8 +67,9 @@ func createTracesProcessor(
return nil, errDiscardOrphansNotSupported
}

processor := newGroupByTraceProcessor(params, nextConsumer, *oCfg)
// the only supported storage for now
st = newMemoryStorage()

return newGroupByTraceProcessor(params.Logger, st, nextConsumer, *oCfg), nil
st = newMemoryStorage(processor.telemetryBuilder)
processor.st = st
return processor, nil
}
8 changes: 3 additions & 5 deletions processor/groupbytraceprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"
)

Expand All @@ -26,10 +27,8 @@ func TestDefaultConfiguration(t *testing.T) {
func TestCreateTestProcessor(t *testing.T) {
c := createDefaultConfig().(*Config)

next := &mockProcessor{}

// test
p, err := createTracesProcessor(context.Background(), processortest.NewNopSettings(), c, next)
p, err := createTracesProcessor(context.Background(), processortest.NewNopSettings(), c, consumertest.NewNop())

// verify
assert.NoError(t, err)
Expand All @@ -39,7 +38,6 @@ func TestCreateTestProcessor(t *testing.T) {
func TestCreateTestProcessorWithNotImplementedOptions(t *testing.T) {
// prepare
f := NewFactory()
next := &mockProcessor{}

// test
for _, tt := range []struct {
Expand All @@ -59,7 +57,7 @@ func TestCreateTestProcessorWithNotImplementedOptions(t *testing.T) {
errDiskStorageNotSupported,
},
} {
p, err := f.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(), tt.config, next)
p, err := f.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(), tt.config, consumertest.NewNop())

// verify
assert.Error(t, tt.expectedErr, err)
Expand Down
Loading

0 comments on commit abf09cf

Please sign in to comment.