Skip to content

Commit

Permalink
[exporter] Feature gate for queue batcher (open-telemetry#11721)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR proceeds
open-telemetry#11637. It
* Introduces a noop feature gate that will be used for queue batcher.
* Updates exporter tests to run with both the feature gate on and off.

<!-- Issue number if applicable -->
#### Link to tracking issue
open-telemetry#10368
open-telemetry#8122


<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
sfc-gh-sili authored and HongChenTW committed Dec 19, 2024
1 parent 237f200 commit f6758ea
Show file tree
Hide file tree
Showing 21 changed files with 1,118 additions and 749 deletions.
4 changes: 4 additions & 0 deletions exporter/debugexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand All @@ -45,6 +46,7 @@ require (
go.opentelemetry.io/collector/consumer/consumertest v0.114.0 // indirect
go.opentelemetry.io/collector/extension v0.114.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect
go.opentelemetry.io/collector/featuregate v1.20.0 // indirect
go.opentelemetry.io/collector/pipeline v0.114.0 // indirect
go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.114.0 // indirect
go.opentelemetry.io/collector/receiver v0.114.0 // indirect
Expand Down Expand Up @@ -115,3 +117,5 @@ replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/c
replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension/extensiontest

replace go.opentelemetry.io/collector/scraper => ../../scraper

replace go.opentelemetry.io/collector/featuregate => ../../featuregate
2 changes: 2 additions & 0 deletions exporter/debugexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions exporter/exporterhelper/exporterhelperprofiles/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.114.0 // indirect
go.opentelemetry.io/collector/extension v0.114.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect
go.opentelemetry.io/collector/featuregate v1.20.0 // indirect
go.opentelemetry.io/collector/pdata v1.20.0 // indirect
go.opentelemetry.io/collector/pipeline v0.114.0 // indirect
go.opentelemetry.io/collector/receiver v0.114.0 // indirect
Expand Down Expand Up @@ -102,3 +104,5 @@ replace go.opentelemetry.io/collector/consumer/consumererror => ../../../consume
replace go.opentelemetry.io/collector/extension/extensiontest => ../../../extension/extensiontest

replace go.opentelemetry.io/collector/scraper => ../../../scraper

replace go.opentelemetry.io/collector/featuregate => ../../../featuregate
2 changes: 2 additions & 0 deletions exporter/exporterhelper/exporterhelperprofiles/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,17 @@ import (
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterqueue" // BaseExporter contains common fields between different exporter types.
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pipeline"
)

var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegister(
"telemetry.UsePullingBasedExporterQueueBatcher",
featuregate.StageBeta,
featuregate.WithRegisterFromVersion("v0.114.0"),
featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"),
)

type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender

// Option apply changes to BaseExporter.
Expand Down
102 changes: 65 additions & 37 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,52 +38,80 @@ func newNoopObsrepSender(*ObsReport) RequestSender {
}

func TestBaseExporter(t *testing.T) {
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
})
}
runTest("enable_queue_batcher", true)
runTest("disable_queue_batcher", false)
}

func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be, err := NewBaseExporter(
defaultSettings, defaultSignal, newNoopObsrepSender,
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutConfig()),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
want := errors.New("my error")
be, err := NewBaseExporter(
defaultSettings, defaultSignal, newNoopObsrepSender,
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutConfig()),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
})
}
runTest("enable_queue_batcher", true)
runTest("disable_queue_batcher", false)
}

func TestQueueOptionsWithRequestExporter(t *testing.T) {
bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.NoError(t, err)
require.Nil(t, bs.Marshaler)
require.Nil(t, bs.Unmarshaler)
_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
require.Error(t, err)
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.NoError(t, err)
require.Nil(t, bs.Marshaler)
require.Nil(t, bs.Unmarshaler)
_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
require.Error(t, err)

_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]()))
require.Error(t, err)
_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]()))
require.Error(t, err)
})
}
runTest("enable_queue_batcher", true)
runTest("disable_queue_batcher", false)
}

func TestBaseExporterLogging(t *testing.T) {
set := exportertest.NewNopSettings()
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
bs, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, WithRetry(rCfg))
require.NoError(t, err)
sendErr := bs.Send(context.Background(), newErrorRequest())
require.Error(t, sendErr)
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
set := exportertest.NewNopSettings()
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
bs, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, WithRetry(rCfg))
require.NoError(t, err)
sendErr := bs.Send(context.Background(), newErrorRequest())
require.Error(t, sendErr)

require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1)
require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1)
})
}
runTest("enable_queue_batcher", true)
runTest("disable_queue_batcher", false)
}
Loading

0 comments on commit f6758ea

Please sign in to comment.