Skip to content

Commit

Permalink
2649 - Change receiver obsreport helpers pattern to match the Process…
Browse files Browse the repository at this point in the history
…or/Exporter (open-telemetry#3227)

* Change Receiver to match pattern with structs and helpers

* Added support for ReceiverSettings

* Add receiver to end functions

* Add comments for new receiver

* Add entry to changelog

* Cache the recevier in structs

* Cache the receiver in other files

* Fix name for var and fix caching

* Fix caching so there is no race

* Fix unit tests

* Add deprecated old funcs back

* Fix comments
  • Loading branch information
humivo authored May 20, 2021
1 parent 7d3d9ca commit f7674b2
Show file tree
Hide file tree
Showing 16 changed files with 218 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
- Add an internal sharedcomponent to be shared by receivers with shared resources (#3198)
- Allow users to configure the Prometheus remote write queue (#3046)
- Mark internaldata traces translation as deprecated for external usage (#3176)
- Change receiver obsreport helpers pattern to match the Processor/Exporter (#3227)

## 🧰 Bug fixes 🧰

Expand Down
146 changes: 127 additions & 19 deletions obsreport/obsreport_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,32 +143,82 @@ func WithLongLivedCtx() StartReceiveOption {
}
}

// Receiver is a helper to add obersvability to a component.Receiver.
type Receiver struct {
receiverID config.ComponentID
transport string
}

// ReceiverSettings are settings for creating an Receiver.
type ReceiverSettings struct {
ReceiverID config.ComponentID
Transport string
}

// NewReceiver creates a new Receiver.
func NewReceiver(cfg ReceiverSettings) *Receiver {
return &Receiver{
receiverID: cfg.ReceiverID,
transport: cfg.Transport,
}
}

// StartTraceDataReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartTraceDataReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
receiveTraceDataOperationSuffix,
opt...)
}

// StartTraceDataReceiveOp is deprecated but is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func StartTraceDataReceiveOp(
operationCtx context.Context,
receiverID config.ComponentID,
transport string,
opt ...StartReceiveOption,
) context.Context {
return traceReceiveOp(
rec := NewReceiver(ReceiverSettings{ReceiverID: receiverID, Transport: transport})
return rec.traceReceiveOp(
operationCtx,
receiverID,
transport,
receiveTraceDataOperationSuffix,
opt...)
}

// EndTraceDataReceiveOp completes the receive operation that was started with
// StartTraceDataReceiveOp.
func (rec *Receiver) EndTraceDataReceiveOp(
receiverCtx context.Context,
format string,
numReceivedSpans int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedSpans,
err,
config.TracesDataType,
)
}

// EndTraceDataReceiveOp is deprecated but completes the receive operation that was started with
// StartTraceDataReceiveOp.
func EndTraceDataReceiveOp(
receiverCtx context.Context,
format string,
numReceivedSpans int,
err error,
) {
endReceiveOp(
rec := NewReceiver(ReceiverSettings{})
rec.endReceiveOp(
receiverCtx,
format,
numReceivedSpans,
Expand All @@ -180,29 +230,59 @@ func EndTraceDataReceiveOp(
// StartLogsReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartLogsReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
receiverLogsOperationSuffix,
opt...)
}

// StartLogsReceiveOp is deprecated but is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func StartLogsReceiveOp(
operationCtx context.Context,
receiverID config.ComponentID,
transport string,
opt ...StartReceiveOption,
) context.Context {
return traceReceiveOp(
rec := NewReceiver(ReceiverSettings{ReceiverID: receiverID, Transport: transport})
return rec.traceReceiveOp(
operationCtx,
receiverID,
transport,
receiverLogsOperationSuffix,
opt...)
}

// EndLogsReceiveOp completes the receive operation that was started with
// StartLogsReceiveOp.
func (rec *Receiver) EndLogsReceiveOp(
receiverCtx context.Context,
format string,
numReceivedLogRecords int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedLogRecords,
err,
config.LogsDataType,
)
}

// EndLogsReceiveOp is deprecated but completes the receive operation that was started with
// StartLogsReceiveOp.
func EndLogsReceiveOp(
receiverCtx context.Context,
format string,
numReceivedLogRecords int,
err error,
) {
endReceiveOp(
rec := NewReceiver(ReceiverSettings{})
rec.endReceiveOp(
receiverCtx,
format,
numReceivedLogRecords,
Expand All @@ -214,29 +294,59 @@ func EndLogsReceiveOp(
// StartMetricsReceiveOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartMetricsReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
receiverMetricsOperationSuffix,
opt...)
}

// StartMetricsReceiveOp is deprecated but is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func StartMetricsReceiveOp(
operationCtx context.Context,
receiverID config.ComponentID,
transport string,
opt ...StartReceiveOption,
) context.Context {
return traceReceiveOp(
rec := NewReceiver(ReceiverSettings{ReceiverID: receiverID, Transport: transport})
return rec.traceReceiveOp(
operationCtx,
receiverID,
transport,
receiverMetricsOperationSuffix,
opt...)
}

// EndMetricsReceiveOp completes the receive operation that was started with
// StartMetricsReceiveOp.
func (rec *Receiver) EndMetricsReceiveOp(
receiverCtx context.Context,
format string,
numReceivedPoints int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedPoints,
err,
config.MetricsDataType,
)
}

// EndMetricsReceiveOp is deprecated but completes the receive operation that was started with
// StartMetricsReceiveOp.
func EndMetricsReceiveOp(
receiverCtx context.Context,
format string,
numReceivedPoints int,
err error,
) {
endReceiveOp(
rec := NewReceiver(ReceiverSettings{})
rec.endReceiveOp(
receiverCtx,
format,
numReceivedPoints,
Expand All @@ -263,10 +373,8 @@ func ReceiverContext(

// traceReceiveOp creates the span used to trace the operation. Returning
// the updated context with the created span.
func traceReceiveOp(
func (rec *Receiver) traceReceiveOp(
receiverCtx context.Context,
receiverID config.ComponentID,
transport string,
operationSuffix string,
opt ...StartReceiveOption,
) context.Context {
Expand All @@ -277,7 +385,7 @@ func traceReceiveOp(

var ctx context.Context
var span *trace.Span
spanName := receiverPrefix + receiverID.String() + operationSuffix
spanName := receiverPrefix + rec.receiverID.String() + operationSuffix
if !opts.LongLivedCtx {
ctx, span = trace.StartSpan(receiverCtx, spanName)
} else {
Expand All @@ -292,14 +400,14 @@ func traceReceiveOp(
ctx = trace.NewContext(receiverCtx, span)
}

if transport != "" {
span.AddAttributes(trace.StringAttribute(TransportKey, transport))
if rec.transport != "" {
span.AddAttributes(trace.StringAttribute(TransportKey, rec.transport))
}
return ctx
}

// endReceiveOp records the observability signals at the end of an operation.
func endReceiveOp(
func (rec *Receiver) endReceiveOp(
receiverCtx context.Context,
format string,
numReceivedItems int,
Expand Down
22 changes: 12 additions & 10 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ func TestReceiveTraceDataOp(t *testing.T) {
}
rcvdSpans := []int{13, 42}
for i, param := range params {
ctx := obsreport.StartTraceDataReceiveOp(receiverCtx, receiver, param.transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
assert.NotNil(t, ctx)

obsreport.EndTraceDataReceiveOp(
rec.EndTraceDataReceiveOp(
ctx,
format,
rcvdSpans[i],
Expand Down Expand Up @@ -169,10 +170,11 @@ func TestReceiveLogsOp(t *testing.T) {
}
rcvdLogRecords := []int{13, 42}
for i, param := range params {
ctx := obsreport.StartLogsReceiveOp(receiverCtx, receiver, param.transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartLogsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)

obsreport.EndLogsReceiveOp(
rec.EndLogsReceiveOp(
ctx,
format,
rcvdLogRecords[i],
Expand Down Expand Up @@ -229,10 +231,11 @@ func TestReceiveMetricsOp(t *testing.T) {
}
rcvdMetricPts := []int{23, 29}
for i, param := range params {
ctx := obsreport.StartMetricsReceiveOp(receiverCtx, receiver, param.transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartMetricsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)

obsreport.EndMetricsReceiveOp(
rec.EndMetricsReceiveOp(
ctx,
format,
rcvdMetricPts[i],
Expand Down Expand Up @@ -499,14 +502,13 @@ func TestReceiveWithLongLivedCtx(t *testing.T) {
for _, op := range ops {
// Use a new context on each operation to simulate distinct operations
// under the same long lived context.
ctx := obsreport.StartTraceDataReceiveOp(
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartTraceDataReceiveOp(
longLivedCtx,
receiver,
transport,
obsreport.WithLongLivedCtx())
assert.NotNil(t, ctx)

obsreport.EndTraceDataReceiveOp(
rec.EndTraceDataReceiveOp(
ctx,
format,
op.numSpans,
Expand Down
15 changes: 9 additions & 6 deletions obsreport/obsreporttest/obsreporttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ func TestCheckReceiverTracesViews(t *testing.T) {
defer doneFn()

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
ctx := obsreport.StartTraceDataReceiveOp(receiverCtx, receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
obsreport.EndTraceDataReceiveOp(
rec.EndTraceDataReceiveOp(
ctx,
format,
7,
Expand All @@ -60,9 +61,10 @@ func TestCheckReceiverMetricsViews(t *testing.T) {
defer doneFn()

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
ctx := obsreport.StartMetricsReceiveOp(receiverCtx, receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartMetricsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
obsreport.EndMetricsReceiveOp(ctx, format, 7, nil)
rec.EndMetricsReceiveOp(ctx, format, 7, nil)

obsreporttest.CheckReceiverMetrics(t, receiver, transport, 7, 0)
}
Expand All @@ -73,9 +75,10 @@ func TestCheckReceiverLogsViews(t *testing.T) {
defer doneFn()

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
ctx := obsreport.StartLogsReceiveOp(receiverCtx, receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartLogsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
obsreport.EndLogsReceiveOp(ctx, format, 7, nil)
rec.EndLogsReceiveOp(ctx, format, 7, nil)

obsreporttest.CheckReceiverLogs(t, receiver, transport, 7, 0)
}
Expand Down
Loading

0 comments on commit f7674b2

Please sign in to comment.