Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add span processor configuration #8117

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .chloggen/codeboten_add-tracer-provider.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for span processor configuration for internal traces

# One or more tracking issues or pull requests related to the change
issues: [8106]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
These options are still experimental. To enable them, users must enable both
`telemetry.useOtelForInternalMetrics` and `telemetry.useOtelWithSDKConfigurationForInternalTelemetry`
feature gates.
1 change: 1 addition & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.39.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.39.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcorecol/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,8 @@ go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1
go.opentelemetry.io/otel/exporters/prometheus v0.39.0/go.mod h1:4jo5Q4CROlCpSPsXLhymi+LYrDXd2ObU5wbKayfZs7Y=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 h1:fl2WmyenEf6LYYlfHAtCUEDyGcpwJNqD4dHGO7PVm4w=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0/go.mod h1:csyQxQ0UHHKVA8KApS7eUO/klMO5sd/av5CNZNU4O6w=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 h1:+XWJd3jf75RXJq29mxbuXhCXFDG3S3R4vBUeSI2P7tE=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0/go.mod h1:hqgzBPTf4yONMFgdZvL/bK42R/iinTyVQtiWihs3SZc=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0
go.opentelemetry.io/otel/exporters/prometheus v0.39.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0
go.opentelemetry.io/otel/metric v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/sdk/metric v0.39.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1
go.opentelemetry.io/otel/exporters/prometheus v0.39.0/go.mod h1:4jo5Q4CROlCpSPsXLhymi+LYrDXd2ObU5wbKayfZs7Y=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 h1:fl2WmyenEf6LYYlfHAtCUEDyGcpwJNqD4dHGO7PVm4w=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0/go.mod h1:csyQxQ0UHHKVA8KApS7eUO/klMO5sd/av5CNZNU4O6w=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 h1:+XWJd3jf75RXJq29mxbuXhCXFDG3S3R4vBUeSI2P7tE=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0/go.mod h1:hqgzBPTf4yONMFgdZvL/bK42R/iinTyVQtiWihs3SZc=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE=
Expand Down
59 changes: 57 additions & 2 deletions service/internal/proctelemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"

"go.opentelemetry.io/collector/obsreport"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
Expand Down Expand Up @@ -58,6 +60,9 @@
attribute.String(semconv.AttributeNetHostName, ""),
attribute.String(semconv.AttributeNetHostPort, ""),
}

errNoValidMetricExporter = errors.New("no valid metric exporter")
errNoValidSpanExporter = errors.New("no valid span exporter")
)

func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) {
Expand All @@ -78,6 +83,56 @@
return nil, nil, fmt.Errorf("unsupported metric reader type %v", reader)
}

func InitSpanProcessor(_ context.Context, processor telemetry.SpanProcessor) (sdktrace.SpanProcessor, error) {
if processor.Batch != nil {
if processor.Batch.Exporter.Console != nil {
exp, err := stdouttrace.New(
stdouttrace.WithPrettyPrint(),
)
if err != nil {
return nil, err
}

Check warning on line 94 in service/internal/proctelemetry/config.go

View check run for this annotation

Codecov / codecov/patch

service/internal/proctelemetry/config.go#L93-L94

Added lines #L93 - L94 were not covered by tests
opts := []sdktrace.BatchSpanProcessorOption{}
if processor.Batch.ExportTimeout != nil {
if *processor.Batch.ExportTimeout < 0 {
return nil, fmt.Errorf("invalid export timeout %d", *processor.Batch.ExportTimeout)
}
opts = append(opts, sdktrace.WithExportTimeout(time.Millisecond*time.Duration(*processor.Batch.ExportTimeout)))
}
if processor.Batch.MaxExportBatchSize != nil {
if *processor.Batch.MaxExportBatchSize < 0 {
return nil, fmt.Errorf("invalid batch size %d", *processor.Batch.MaxExportBatchSize)
}
opts = append(opts, sdktrace.WithMaxExportBatchSize(*processor.Batch.MaxExportBatchSize))
}
if processor.Batch.MaxQueueSize != nil {
if *processor.Batch.MaxQueueSize < 0 {
return nil, fmt.Errorf("invalid queue size %d", *processor.Batch.MaxQueueSize)
}
opts = append(opts, sdktrace.WithMaxQueueSize(*processor.Batch.MaxQueueSize))
}
if processor.Batch.ScheduleDelay != nil {
if *processor.Batch.ScheduleDelay < 0 {
return nil, fmt.Errorf("invalid schedule delay %d", *processor.Batch.ScheduleDelay)
}
opts = append(opts, sdktrace.WithBatchTimeout(time.Millisecond*time.Duration(*processor.Batch.ScheduleDelay)))
}
return sdktrace.NewBatchSpanProcessor(exp, opts...), nil
}
return nil, errNoValidSpanExporter
}
return nil, fmt.Errorf("unsupported span processor type %v", processor)
}

func InitTracerProvider(res *resource.Resource, options []sdktrace.TracerProviderOption) (*sdktrace.TracerProvider, error) {
opts := []sdktrace.TracerProviderOption{
sdktrace.WithResource(res),
}

opts = append(opts, options...)
return sdktrace.NewTracerProvider(opts...), nil
}

func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disableHighCardinality bool) (*sdkmetric.MeterProvider, error) {
opts := []sdkmetric.Option{
sdkmetric.WithResource(res),
Expand Down Expand Up @@ -175,7 +230,7 @@
if exporter.Prometheus != nil {
return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel)
}
return nil, nil, fmt.Errorf("no valid exporter")
return nil, nil, errNoValidMetricExporter
}

func initPeriodicExporter(ctx context.Context, exporter telemetry.MetricExporter, opts ...sdkmetric.PeriodicReaderOption) (sdkmetric.Reader, *http.Server, error) {
Expand Down Expand Up @@ -207,7 +262,7 @@
}
return sdkmetric.NewPeriodicReader(exp, opts...), nil, nil
}
return nil, nil, fmt.Errorf("no valid exporter")
return nil, nil, errNoValidMetricExporter
}

func normalizeEndpoint(endpoint string) string {
Expand Down
97 changes: 94 additions & 3 deletions service/internal/proctelemetry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMetricReader(t *testing.T) {
},
},
},
err: errors.New("no valid exporter"),
err: errNoValidMetricExporter,
},
{
name: "pull/prometheus-invalid-config-no-host",
Expand Down Expand Up @@ -93,14 +93,14 @@ func TestMetricReader(t *testing.T) {
},
},
},
err: errors.New("no valid exporter"),
err: errNoValidMetricExporter,
},
{
name: "periodic/no-exporter",
reader: telemetry.MetricReader{
Periodic: &telemetry.PeriodicMetricReader{},
},
err: errors.New("no valid exporter"),
err: errNoValidMetricExporter,
},
{
name: "periodic/console-exporter",
Expand Down Expand Up @@ -344,3 +344,94 @@ func TestMetricReader(t *testing.T) {
})
}
}

func TestSpanProcessor(t *testing.T) {
testCases := []struct {
name string
processor telemetry.SpanProcessor
args any
err error
}{
{
name: "no processor",
err: errors.New("unsupported span processor type {<nil> <nil>}"),
},
{
name: "batch processor invalid exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
Exporter: telemetry.SpanExporter{},
},
},
err: errNoValidSpanExporter,
},
{
name: "batch processor invalid batch size console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxExportBatchSize: intPtr(-1),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid batch size -1"),
},
{
name: "batch processor invalid export timeout console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
ExportTimeout: intPtr(-2),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid export timeout -2"),
},
{
name: "batch processor invalid queue size console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxQueueSize: intPtr(-3),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid queue size -3"),
},
{
name: "batch processor invalid schedule delay console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
ScheduleDelay: intPtr(-4),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid schedule delay -4"),
},
{
name: "batch processor console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxExportBatchSize: intPtr(0),
ExportTimeout: intPtr(0),
MaxQueueSize: intPtr(0),
ScheduleDelay: intPtr(0),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
_, err := InitSpanProcessor(context.Background(), tt.processor)
assert.Equal(t, tt.err, err)
})
}
}
1 change: 1 addition & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}
srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp
srv.telemetrySettings.TracerProvider = srv.telemetryInitializer.tp

// process the configuration and initialize the pipeline
if err = srv.initExtensionsAndPipeline(ctx, set, cfg); err != nil {
Expand Down
26 changes: 24 additions & 2 deletions service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
noopmetric "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand Down Expand Up @@ -51,6 +53,7 @@
views []*view.View
ocRegistry *ocmetric.Registry
mp metric.MeterProvider
tp trace.TracerProvider
servers []*http.Server

useOtel bool
Expand All @@ -60,7 +63,8 @@

func newColTelemetry(useOtel bool, disableHighCardinality bool, extendedConfig bool) *telemetryInitializer {
return &telemetryInitializer{
mp: noop.NewMeterProvider(),
mp: noopmetric.NewMeterProvider(),
tp: trace.NewNoopTracerProvider(),
useOtel: useOtel,
disableHighCardinality: disableHighCardinality,
extendedConfig: extendedConfig,
Expand All @@ -79,6 +83,12 @@

settings.Logger.Info("Setting up own telemetry...")

if tp, err := tel.initTraces(res, cfg); err == nil {
tel.tp = tp
} else {
return err
}

Check warning on line 90 in service/telemetry.go

View check run for this annotation

Codecov / codecov/patch

service/telemetry.go#L89-L90

Added lines #L89 - L90 were not covered by tests

if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil {
otel.SetTextMapPropagator(tp)
} else {
Expand All @@ -88,6 +98,18 @@
return tel.initMetrics(res, settings.Logger, cfg, asyncErrorChannel)
}

func (tel *telemetryInitializer) initTraces(res *resource.Resource, cfg telemetry.Config) (trace.TracerProvider, error) {
opts := []sdktrace.TracerProviderOption{}
for _, processor := range cfg.Traces.Processors {
sp, err := proctelemetry.InitSpanProcessor(context.Background(), processor)
if err != nil {
return nil, err
}

Check warning on line 107 in service/telemetry.go

View check run for this annotation

Codecov / codecov/patch

service/telemetry.go#L106-L107

Added lines #L106 - L107 were not covered by tests
opts = append(opts, sdktrace.WithSpanProcessor(sp))
}
return proctelemetry.InitTracerProvider(res, opts)
}

func (tel *telemetryInitializer) initMetrics(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error {
// Initialize the ocRegistry, still used by the process metrics.
tel.ocRegistry = ocmetric.NewRegistry()
Expand Down
26 changes: 26 additions & 0 deletions service/telemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ type TracesConfig struct {
// tracecontext and b3 are supported. By default, the value is set to empty list and
// context propagation is disabled.
Propagators []string `mapstructure:"propagators"`
// Processors allow configuration of span processors to emit spans to
// any number of suported backends.
Processors []SpanProcessor `mapstructure:"processors"`
}

// Validate checks whether the current configuration is valid
Expand All @@ -132,6 +135,29 @@ func (c *Config) Validate() error {
return nil
}

func (sp *SpanProcessor) Unmarshal(conf *confmap.Conf) error {
if !obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() {
// only unmarshal if feature gate is enabled
return nil
}

if conf == nil {
return nil
}

if err := conf.Unmarshal(sp); err != nil {
return fmt.Errorf("invalid span processor configuration: %w", err)
}

if sp.Batch != nil {
if sp.Batch.Exporter.Console == nil {
return fmt.Errorf("invalid exporter configuration")
}
return nil
}
return fmt.Errorf("unsupported span processor type %s", conf.AllKeys())
}

func (mr *MetricReader) Unmarshal(conf *confmap.Conf) error {
if !obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() {
// only unmarshal if feature gate is enabled
Expand Down
Loading
Loading