Skip to content

Commit

Permalink
obsreport: wire up ReceiverCreateSettings for receivers using obsrepo…
Browse files Browse the repository at this point in the history
…rt (#4149)

* obsreport: wire up ReceiverCreateSettings for receivers using obsreport

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>

* obsreport: wire up ReceiverCreateSettings for scrapers using obsreport

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>

* pass ReceiverCreateSettings through scraper controller

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>

* Fix lint issue

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>

* Ensure TracerProvider is available to scraper receiver for now

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>

* Fix test failures

Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>
  • Loading branch information
Aneurysm9 authored Oct 11, 2021
1 parent 0761e04 commit 2209f2a
Show file tree
Hide file tree
Showing 14 changed files with 108 additions and 67 deletions.
7 changes: 4 additions & 3 deletions obsreport/obsreport_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/obsreportconfig"
Expand All @@ -47,7 +47,8 @@ type ReceiverSettings struct {
// Typically the long lived context is associated to a connection,
// eg.: a gRPC stream, for which many batches of data are received in individual
// operations without a corresponding new context per operation.
LongLivedCtx bool
LongLivedCtx bool
ReceiverCreateSettings component.ReceiverCreateSettings
}

// NewReceiver creates a new Receiver.
Expand All @@ -60,7 +61,7 @@ func NewReceiver(cfg ReceiverSettings) *Receiver {
tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
tag.Upsert(obsmetrics.TagKeyTransport, cfg.Transport, tag.WithTTL(tag.TTLNoPropagation)),
},
tracer: otel.GetTracerProvider().Tracer(cfg.ReceiverID.String()),
tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.ReceiverID.String()),
}
}

Expand Down
9 changes: 5 additions & 4 deletions obsreport/obsreport_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/obsreportconfig"
Expand All @@ -40,8 +40,9 @@ type Scraper struct {

// ScraperSettings are settings for creating a Scraper.
type ScraperSettings struct {
ReceiverID config.ComponentID
Scraper config.ComponentID
ReceiverID config.ComponentID
Scraper config.ComponentID
ReceiverCreateSettings component.ReceiverCreateSettings
}

// NewScraper creates a new Scraper.
Expand All @@ -52,7 +53,7 @@ func NewScraper(cfg ScraperSettings) *Scraper {
mutators: []tag.Mutator{
tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
tag.Upsert(obsmetrics.TagKeyScraper, cfg.Scraper.String(), tag.WithTTL(tag.TTLNoPropagation))},
tracer: otel.GetTracerProvider().Tracer(cfg.Scraper.String()),
tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.Scraper.String()),
}
}

Expand Down
50 changes: 27 additions & 23 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
Expand All @@ -39,7 +37,7 @@ const (
)

var (
receiver = config.NewComponentID("fakeReicever")
receiver = config.NewComponentID("fakeReceiver")
scraper = config.NewComponentID("fakeScraper")
processor = config.NewComponentID("fakeProcessor")
exporter = config.NewComponentID("fakeExporter")
Expand All @@ -58,9 +56,6 @@ func TestReceiveTraceDataOp(t *testing.T) {
require.NoError(t, err)
defer set.Shutdown(context.Background())

otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

parentCtx, parentSpan := set.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand All @@ -69,7 +64,11 @@ func TestReceiveTraceDataOp(t *testing.T) {
{items: 42, err: nil},
}
for i, param := range params {
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: transport})
rec := NewReceiver(ReceiverSettings{
ReceiverID: receiver,
Transport: transport,
ReceiverCreateSettings: set.ToReceiverCreateSettings(),
})
ctx := rec.StartTracesOp(parentCtx)
assert.NotNil(t, ctx)
rec.EndTracesOp(ctx, format, params[i].items, param.err)
Expand Down Expand Up @@ -105,9 +104,6 @@ func TestReceiveLogsOp(t *testing.T) {
require.NoError(t, err)
defer set.Shutdown(context.Background())

otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

parentCtx, parentSpan := set.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand All @@ -116,7 +112,11 @@ func TestReceiveLogsOp(t *testing.T) {
{items: 42, err: nil},
}
for i, param := range params {
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: transport})
rec := NewReceiver(ReceiverSettings{
ReceiverID: receiver,
Transport: transport,
ReceiverCreateSettings: set.ToReceiverCreateSettings(),
})
ctx := rec.StartLogsOp(parentCtx)
assert.NotNil(t, ctx)
rec.EndLogsOp(ctx, format, params[i].items, param.err)
Expand Down Expand Up @@ -152,9 +152,6 @@ func TestReceiveMetricsOp(t *testing.T) {
require.NoError(t, err)
defer set.Shutdown(context.Background())

otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

parentCtx, parentSpan := set.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand All @@ -163,7 +160,11 @@ func TestReceiveMetricsOp(t *testing.T) {
{items: 29, err: nil},
}
for i, param := range params {
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: transport})
rec := NewReceiver(ReceiverSettings{
ReceiverID: receiver,
Transport: transport,
ReceiverCreateSettings: set.ToReceiverCreateSettings(),
})
ctx := rec.StartMetricsOp(parentCtx)
assert.NotNil(t, ctx)
rec.EndMetricsOp(ctx, format, params[i].items, param.err)
Expand Down Expand Up @@ -200,9 +201,6 @@ func TestScrapeMetricsDataOp(t *testing.T) {
require.NoError(t, err)
defer set.Shutdown(context.Background())

otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

parentCtx, parentSpan := set.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand All @@ -212,7 +210,11 @@ func TestScrapeMetricsDataOp(t *testing.T) {
{items: 15, err: nil},
}
for i := range params {
scrp := NewScraper(ScraperSettings{ReceiverID: receiver, Scraper: scraper})
scrp := NewScraper(ScraperSettings{
ReceiverID: receiver,
Scraper: scraper,
ReceiverCreateSettings: set.ToReceiverCreateSettings(),
})
ctx := scrp.StartMetricsOp(parentCtx)
assert.NotNil(t, ctx)
scrp.EndMetricsOp(ctx, params[i].items, params[i].err)
Expand Down Expand Up @@ -409,9 +411,6 @@ func TestReceiveWithLongLivedCtx(t *testing.T) {
require.NoError(t, err)
defer set.Shutdown(context.Background())

otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

longLivedCtx, parentSpan := set.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()

Expand All @@ -422,7 +421,12 @@ func TestReceiveWithLongLivedCtx(t *testing.T) {
for i := range params {
// Use a new context on each operation to simulate distinct operations
// under the same long lived context.
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: transport, LongLivedCtx: true})
rec := NewReceiver(ReceiverSettings{
ReceiverID: receiver,
Transport: transport,
LongLivedCtx: true,
ReceiverCreateSettings: set.ToReceiverCreateSettings(),
})
ctx := rec.StartTracesOp(longLivedCtx)
assert.NotNil(t, ctx)
rec.EndTracesOp(ctx, format, params[i].items, params[i].err)
Expand Down
18 changes: 15 additions & 3 deletions obsreport/obsreporttest/obsreporttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ func TestCheckReceiverTracesViews(t *testing.T) {
require.NoError(t, err)
defer set.Shutdown(context.Background())

rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: receiver,
Transport: transport,
ReceiverCreateSettings: set.ToReceiverCreateSettings(),
})
ctx := rec.StartTracesOp(context.Background())
assert.NotNil(t, ctx)
rec.EndTracesOp(
Expand All @@ -59,7 +63,11 @@ func TestCheckReceiverMetricsViews(t *testing.T) {
require.NoError(t, err)
defer set.Shutdown(context.Background())

rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: receiver,
Transport: transport,
ReceiverCreateSettings: set.ToReceiverCreateSettings(),
})
ctx := rec.StartMetricsOp(context.Background())
assert.NotNil(t, ctx)
rec.EndMetricsOp(ctx, format, 7, nil)
Expand All @@ -72,7 +80,11 @@ func TestCheckReceiverLogsViews(t *testing.T) {
require.NoError(t, err)
defer set.Shutdown(context.Background())

rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: receiver,
Transport: transport,
ReceiverCreateSettings: set.ToReceiverCreateSettings(),
})
ctx := rec.StartLogsOp(context.Background())
assert.NotNil(t, ctx)
rec.EndLogsOp(ctx, format, 7, nil)
Expand Down
9 changes: 7 additions & 2 deletions receiver/otlpreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/otlpgrpc"
Expand All @@ -36,10 +37,14 @@ type Receiver struct {
}

// New creates a new Receiver reference.
func New(id config.ComponentID, nextConsumer consumer.Logs) *Receiver {
func New(id config.ComponentID, nextConsumer consumer.Logs, set component.ReceiverCreateSettings) *Receiver {
return &Receiver{
nextConsumer: nextConsumer,
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: receiverTransport}),
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: id,
Transport: receiverTransport,
ReceiverCreateSettings: set,
}),
}
}

Expand Down
3 changes: 2 additions & 1 deletion receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand Down Expand Up @@ -112,7 +113,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Logs) (net.Addr, func())
}
}

r := New(config.NewComponentIDWithName("otlp", "log"), tc)
r := New(config.NewComponentIDWithName("otlp", "log"), tc, componenttest.NewNopReceiverCreateSettings())
require.NoError(t, err)

// Now run it as a gRPC server
Expand Down
9 changes: 7 additions & 2 deletions receiver/otlpreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/otlpgrpc"
Expand All @@ -36,10 +37,14 @@ type Receiver struct {
}

// New creates a new Receiver reference.
func New(id config.ComponentID, nextConsumer consumer.Metrics) *Receiver {
func New(id config.ComponentID, nextConsumer consumer.Metrics, set component.ReceiverCreateSettings) *Receiver {
return &Receiver{
nextConsumer: nextConsumer,
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: receiverTransport}),
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: id,
Transport: receiverTransport,
ReceiverCreateSettings: set,
}),
}
}

Expand Down
3 changes: 2 additions & 1 deletion receiver/otlpreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand Down Expand Up @@ -114,7 +115,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, func
}
}

r := New(config.NewComponentIDWithName("otlp", "metrics"), mc)
r := New(config.NewComponentIDWithName("otlp", "metrics"), mc, componenttest.NewNopReceiverCreateSettings())
// Now run it as a gRPC server
srv := grpc.NewServer()
otlpgrpc.RegisterMetricsServer(srv, r)
Expand Down
9 changes: 7 additions & 2 deletions receiver/otlpreceiver/internal/trace/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/otlpgrpc"
Expand All @@ -36,10 +37,14 @@ type Receiver struct {
}

// New creates a new Receiver reference.
func New(id config.ComponentID, nextConsumer consumer.Traces) *Receiver {
func New(id config.ComponentID, nextConsumer consumer.Traces, set component.ReceiverCreateSettings) *Receiver {
return &Receiver{
nextConsumer: nextConsumer,
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: receiverTransport}),
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: id,
Transport: receiverTransport,
ReceiverCreateSettings: set,
}),
}
}

Expand Down
3 changes: 2 additions & 1 deletion receiver/otlpreceiver/internal/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand Down Expand Up @@ -111,7 +112,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (net.Addr, func(
}
}

r := New(config.NewComponentIDWithName("otlp", "trace"), tc)
r := New(config.NewComponentIDWithName("otlp", "trace"), tc, componenttest.NewNopReceiverCreateSettings())
require.NoError(t, err)

// Now run it as a gRPC server
Expand Down
6 changes: 3 additions & 3 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (r *otlpReceiver) registerTraceConsumer(tc consumer.Traces) error {
if tc == nil {
return componenterror.ErrNilNextConsumer
}
r.traceReceiver = trace.New(r.cfg.ID(), tc)
r.traceReceiver = trace.New(r.cfg.ID(), tc, r.settings)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/traces", func(resp http.ResponseWriter, req *http.Request) {
handleTraces(resp, req, r.traceReceiver, pbEncoder)
Expand All @@ -204,7 +204,7 @@ func (r *otlpReceiver) registerMetricsConsumer(mc consumer.Metrics) error {
if mc == nil {
return componenterror.ErrNilNextConsumer
}
r.metricsReceiver = metrics.New(r.cfg.ID(), mc)
r.metricsReceiver = metrics.New(r.cfg.ID(), mc, r.settings)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/metrics", func(resp http.ResponseWriter, req *http.Request) {
handleMetrics(resp, req, r.metricsReceiver, pbEncoder)
Expand All @@ -220,7 +220,7 @@ func (r *otlpReceiver) registerLogsConsumer(lc consumer.Logs) error {
if lc == nil {
return componenterror.ErrNilNextConsumer
}
r.logReceiver = logs.New(r.cfg.ID(), lc)
r.logReceiver = logs.New(r.cfg.ID(), lc, r.settings)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/logs", func(w http.ResponseWriter, req *http.Request) {
handleLogs(w, req, r.logReceiver, pbEncoder)
Expand Down
Loading

0 comments on commit 2209f2a

Please sign in to comment.