diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 082a91efe51..a17f92f5ed8 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -8,23 +8,49 @@ import ( "time" ) +const ( + dfltMaxQSize = 2048 + dfltExpInterval = time.Second + dfltExpTimeout = 30 * time.Second + dfltExpMaxBatchSize = 512 + + envarMaxQSize = "OTEL_BLRP_MAX_QUEUE_SIZE" + envarExpInterval = "OTEL_BLRP_SCHEDULE_DELAY" + envarExpTimeout = "OTEL_BLRP_EXPORT_TIMEOUT" + envarExpMaxBatchSize = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE" +) + // Compile-time check BatchingProcessor implements Processor. var _ Processor = (*BatchingProcessor)(nil) -// BatchingProcessor is an processor that asynchronously exports batches of log records. -type BatchingProcessor struct{} +// BatchingProcessor is a processor that exports batches of log records. +type BatchingProcessor struct { + exporter Exporter -type batcherConfig struct{} + maxQueueSize int + exportInterval time.Duration + exportTimeout time.Duration + exportMaxBatchSize int +} // NewBatchingProcessor decorates the provided exporter // so that the log records are batched before exporting. // -// All of the exporter's methods are called from a single dedicated -// background goroutine. Therefore, the exporter does not need to -// be concurrent safe. +// All of the exporter's methods are called synchronously. func NewBatchingProcessor(exporter Exporter, opts ...BatchingOption) *BatchingProcessor { - // TODO (#5063): Implement. - return nil + if exporter == nil { + // Do not panic on nil export. + exporter = defaultNoopExporter + } + cfg := newBatchingConfig(opts) + return &BatchingProcessor{ + exporter: exporter, + + maxQueueSize: cfg.maxQSize.Value, + exportInterval: cfg.expInterval.Value, + exportTimeout: cfg.expTimeout.Value, + exportMaxBatchSize: cfg.expMaxBatchSize.Value, + } } // OnEmit batches provided log record. @@ -50,14 +76,55 @@ func (b *BatchingProcessor) ForceFlush(ctx context.Context) error { return nil } -// BatchingOption applies a configuration to a BatchingProcessor. +type batchingConfig struct { + maxQSize setting[int] + expInterval setting[time.Duration] + expTimeout setting[time.Duration] + expMaxBatchSize setting[int] +} + +func newBatchingConfig(options []BatchingOption) batchingConfig { + var c batchingConfig + for _, o := range options { + c = o.apply(c) + } + + c.maxQSize = c.maxQSize.Resolve( + clearLessThanOne[int](), + getenv[int](envarMaxQSize), + clearLessThanOne[int](), + fallback[int](dfltMaxQSize), + ) + c.expInterval = c.expInterval.Resolve( + clearLessThanOne[time.Duration](), + getenv[time.Duration](envarExpInterval), + clearLessThanOne[time.Duration](), + fallback[time.Duration](dfltExpInterval), + ) + c.expTimeout = c.expTimeout.Resolve( + clearLessThanOne[time.Duration](), + getenv[time.Duration](envarExpTimeout), + clearLessThanOne[time.Duration](), + fallback[time.Duration](dfltExpTimeout), + ) + c.expMaxBatchSize = c.expMaxBatchSize.Resolve( + clearLessThanOne[int](), + getenv[int](envarExpMaxBatchSize), + clearLessThanOne[int](), + fallback[int](dfltExpMaxBatchSize), + ) + + return c +} + +// BatchingOption applies a configuration to a [BatchingProcessor]. type BatchingOption interface { - apply(batcherConfig) batcherConfig + apply(batchingConfig) batchingConfig } -type batchingOptionFunc func(batcherConfig) batcherConfig +type batchingOptionFunc func(batchingConfig) batchingConfig -func (fn batchingOptionFunc) apply(c batcherConfig) batcherConfig { +func (fn batchingOptionFunc) apply(c batchingConfig) batchingConfig { return fn(c) } @@ -70,9 +137,9 @@ func (fn batchingOptionFunc) apply(c batcherConfig) batcherConfig { // By default, if an environment variable is not set, and this option is not // passed, 2048 will be used. // The default value is also used when the provided value is less than one. -func WithMaxQueueSize(max int) BatchingOption { - return batchingOptionFunc(func(cfg batcherConfig) batcherConfig { - // TODO (#5063): Implement. +func WithMaxQueueSize(size int) BatchingOption { + return batchingOptionFunc(func(cfg batchingConfig) batchingConfig { + cfg.maxQSize = newSetting(size) return cfg }) } @@ -86,8 +153,8 @@ func WithMaxQueueSize(max int) BatchingOption { // passed, 1s will be used. // The default value is also used when the provided value is less than one. func WithExportInterval(d time.Duration) BatchingOption { - return batchingOptionFunc(func(cfg batcherConfig) batcherConfig { - // TODO (#5063): Implement. + return batchingOptionFunc(func(cfg batchingConfig) batchingConfig { + cfg.expInterval = newSetting(d) return cfg }) } @@ -101,8 +168,8 @@ func WithExportInterval(d time.Duration) BatchingOption { // passed, 30s will be used. // The default value is also used when the provided value is less than one. func WithExportTimeout(d time.Duration) BatchingOption { - return batchingOptionFunc(func(cfg batcherConfig) batcherConfig { - // TODO (#5063): Implement. + return batchingOptionFunc(func(cfg batchingConfig) batchingConfig { + cfg.expTimeout = newSetting(d) return cfg }) } @@ -116,9 +183,9 @@ func WithExportTimeout(d time.Duration) BatchingOption { // By default, if an environment variable is not set, and this option is not // passed, 512 will be used. // The default value is also used when the provided value is less than one. -func WithExportMaxBatchSize(max int) BatchingOption { - return batchingOptionFunc(func(cfg batcherConfig) batcherConfig { - // TODO (#5063): Implement. +func WithExportMaxBatchSize(size int) BatchingOption { + return batchingOptionFunc(func(cfg batchingConfig) batchingConfig { + cfg.expMaxBatchSize = newSetting(size) return cfg }) } diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go new file mode 100644 index 00000000000..a23397577c2 --- /dev/null +++ b/sdk/log/batch_test.go @@ -0,0 +1,134 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log // import "go.opentelemetry.io/otel/sdk/log" + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel" +) + +func TestNewBatchingProcessorConfiguration(t *testing.T) { + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + t.Log(err) + })) + + testcases := []struct { + name string + envars map[string]string + options []BatchingOption + want *BatchingProcessor + }{ + { + name: "Defaults", + want: &BatchingProcessor{ + exporter: defaultNoopExporter, + maxQueueSize: dfltMaxQSize, + exportInterval: dfltExpInterval, + exportTimeout: dfltExpTimeout, + exportMaxBatchSize: dfltExpMaxBatchSize, + }, + }, + { + name: "Options", + options: []BatchingOption{ + WithMaxQueueSize(1), + WithExportInterval(time.Microsecond), + WithExportTimeout(time.Hour), + WithExportMaxBatchSize(2), + }, + want: &BatchingProcessor{ + exporter: defaultNoopExporter, + maxQueueSize: 1, + exportInterval: time.Microsecond, + exportTimeout: time.Hour, + exportMaxBatchSize: 2, + }, + }, + { + name: "Environment", + envars: map[string]string{ + envarMaxQSize: strconv.Itoa(1), + envarExpInterval: strconv.Itoa(100), + envarExpTimeout: strconv.Itoa(1000), + envarExpMaxBatchSize: strconv.Itoa(10), + }, + want: &BatchingProcessor{ + exporter: defaultNoopExporter, + maxQueueSize: 1, + exportInterval: 100 * time.Millisecond, + exportTimeout: 1000 * time.Millisecond, + exportMaxBatchSize: 10, + }, + }, + { + name: "InvalidOptions", + options: []BatchingOption{ + WithMaxQueueSize(-11), + WithExportInterval(-1 * time.Microsecond), + WithExportTimeout(-1 * time.Hour), + WithExportMaxBatchSize(-2), + }, + want: &BatchingProcessor{ + exporter: defaultNoopExporter, + maxQueueSize: dfltMaxQSize, + exportInterval: dfltExpInterval, + exportTimeout: dfltExpTimeout, + exportMaxBatchSize: dfltExpMaxBatchSize, + }, + }, + { + name: "InvalidEnvironment", + envars: map[string]string{ + envarMaxQSize: "-1", + envarExpInterval: "-1", + envarExpTimeout: "-1", + envarExpMaxBatchSize: "-1", + }, + want: &BatchingProcessor{ + exporter: defaultNoopExporter, + maxQueueSize: dfltMaxQSize, + exportInterval: dfltExpInterval, + exportTimeout: dfltExpTimeout, + exportMaxBatchSize: dfltExpMaxBatchSize, + }, + }, + { + name: "Precedence", + envars: map[string]string{ + envarMaxQSize: strconv.Itoa(1), + envarExpInterval: strconv.Itoa(100), + envarExpTimeout: strconv.Itoa(1000), + envarExpMaxBatchSize: strconv.Itoa(10), + }, + options: []BatchingOption{ + // These override the environment variables. + WithMaxQueueSize(3), + WithExportInterval(time.Microsecond), + WithExportTimeout(time.Hour), + WithExportMaxBatchSize(2), + }, + want: &BatchingProcessor{ + exporter: defaultNoopExporter, + maxQueueSize: 3, + exportInterval: time.Microsecond, + exportTimeout: time.Hour, + exportMaxBatchSize: 2, + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + for key, value := range tc.envars { + t.Setenv(key, value) + } + assert.Equal(t, tc.want, NewBatchingProcessor(nil, tc.options...)) + }) + } +} diff --git a/sdk/log/provider.go b/sdk/log/provider.go index 704c505e968..a51994d5a76 100644 --- a/sdk/log/provider.go +++ b/sdk/log/provider.go @@ -6,13 +6,9 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" "errors" - "fmt" - "os" - "strconv" "sync" "sync/atomic" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/log/embedded" @@ -32,8 +28,8 @@ const ( type providerConfig struct { resource *resource.Resource processors []Processor - attrCntLim limit - attrValLenLim limit + attrCntLim setting[int] + attrValLenLim setting[int] } func newProviderConfig(opts []LoggerProviderOption) providerConfig { @@ -47,56 +43,18 @@ func newProviderConfig(opts []LoggerProviderOption) providerConfig { } c.attrCntLim = c.attrCntLim.Resolve( - envarAttrCntLim, - defaultAttrCntLim, + getenv[int](envarAttrCntLim), + fallback[int](defaultAttrCntLim), ) c.attrValLenLim = c.attrValLenLim.Resolve( - envarAttrValLenLim, - defaultAttrValLenLim, + getenv[int](envarAttrValLenLim), + fallback[int](defaultAttrValLenLim), ) return c } -type limit struct { - value int - set bool -} - -func newLimit(value int) limit { - return limit{value: value, set: true} -} - -// Resolve returns the resolved form of the limit l. If l's value is set, it -// will return l. If the l's value is not set, a new limit based on the -// environment variable envar will be returned if that environment variable is -// set. Otherwise, fallback is used to construct a new limit that is returned. -func (l limit) Resolve(envar string, fallback int) limit { - if l.set { - return l - } - - if v := os.Getenv(envar); v != "" { - n, err := strconv.Atoi(v) - if err == nil { - return newLimit(n) - } - otel.Handle(fmt.Errorf("invalid %s value %s: %w", envar, v, err)) - } - - return newLimit(fallback) -} - -// Value returns the limit value if set. Otherwise, it returns -1. -func (l limit) Value() int { - if l.set { - return l.value - } - // Fail open, not closed (-1 == unlimited). - return -1 -} - // LoggerProvider handles the creation and coordination of Loggers. All Loggers // created by a LoggerProvider will be associated with the same Resource. type LoggerProvider struct { @@ -127,8 +85,8 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider { return &LoggerProvider{ resource: cfg.resource, processors: cfg.processors, - attributeCountLimit: cfg.attrCntLim.Value(), - attributeValueLengthLimit: cfg.attrValLenLim.Value(), + attributeCountLimit: cfg.attrCntLim.Value, + attributeValueLengthLimit: cfg.attrValLenLim.Value, } } @@ -257,7 +215,7 @@ func WithProcessor(processor Processor) LoggerProviderOption { // passed, 128 will be used. func WithAttributeCountLimit(limit int) LoggerProviderOption { return loggerProviderOptionFunc(func(cfg providerConfig) providerConfig { - cfg.attrCntLim = newLimit(limit) + cfg.attrCntLim = newSetting(limit) return cfg }) } @@ -276,7 +234,7 @@ func WithAttributeCountLimit(limit int) LoggerProviderOption { // passed, no limit (-1) will be used. func WithAttributeValueLengthLimit(limit int) LoggerProviderOption { return loggerProviderOptionFunc(func(cfg providerConfig) providerConfig { - cfg.attrValLenLim = newLimit(limit) + cfg.attrValLenLim = newSetting(limit) return cfg }) } diff --git a/sdk/log/provider_test.go b/sdk/log/provider_test.go index 258ef4615f2..bfa8afcda1d 100644 --- a/sdk/log/provider_test.go +++ b/sdk/log/provider_test.go @@ -155,11 +155,6 @@ func TestNewLoggerProviderConfiguration(t *testing.T) { } } -func TestLimitValueFailsOpen(t *testing.T) { - var l limit - assert.Equal(t, -1, l.Value(), "limit value should default to unlimited") -} - func TestLoggerProviderConcurrentSafe(t *testing.T) { const goRoutineN = 10 diff --git a/sdk/log/setting.go b/sdk/log/setting.go new file mode 100644 index 00000000000..1172ab3a952 --- /dev/null +++ b/sdk/log/setting.go @@ -0,0 +1,108 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log // import "go.opentelemetry.io/otel/sdk/log" + +import ( + "fmt" + "os" + "strconv" + "time" + + "go.opentelemetry.io/otel" +) + +// setting is a configuration setting value. +type setting[T any] struct { + Value T + Set bool +} + +// newSetting returns a new [setting] with the value set. +func newSetting[T any](value T) setting[T] { + return setting[T]{Value: value, Set: true} +} + +// resolver returns an updated setting after applying an resolution operation. +type resolver[T any] func(setting[T]) setting[T] + +// Resolve returns a resolved version of s. +// +// It will apply all the passed fn in the order provided, chaining together the +// return setting to the next input. The setting s is used as the initial +// argument to the first fn. +// +// Each fn needs to validate if it should apply given the Set state of the +// setting. This will not perform any checks on the set state when chaining +// function. +func (s setting[T]) Resolve(fn ...resolver[T]) setting[T] { + for _, f := range fn { + s = f(s) + } + return s +} + +// clearLessThanOne returns a resolver that will clear a setting value and +// change its set state to false if its value is less than 1. +func clearLessThanOne[T ~int | ~int64]() resolver[T] { + return func(s setting[T]) setting[T] { + if s.Value < 1 { + s.Value = 0 + s.Set = false + } + return s + } +} + +// getenv returns a resolver that will apply an integer environment variable +// value associated with key to a setting value. +// +// If the input setting to the resolver is set, the environment variable will +// not be applied. +// +// If the environment variable value associated with key is not an integer, an +// error will be sent to the OTel error handler and the setting will not be +// updated. +// +// If the setting value is a [time.Duration] type, the environment variable +// will be interpreted as a duration of milliseconds. +func getenv[T ~int | ~int64](key string) resolver[T] { + return func(s setting[T]) setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + if v := os.Getenv(key); v != "" { + n, err := strconv.Atoi(v) + if err != nil { + otel.Handle(fmt.Errorf("invalid %s value %s: %w", key, v, err)) + } else { + switch any(s.Value).(type) { + case time.Duration: + // OTel duration envar are in millisecond. + s.Value = T(time.Duration(n) * time.Millisecond) + default: + s.Value = T(n) + } + s.Set = true + } + } + return s + } +} + +// fallback returns a resolve that will set a setting value to val if it is not +// already set. +// +// This is usually passed at the end of a resolver chain to ensure a default is +// applied if the setting has not already been set. +func fallback[T any](val T) resolver[T] { + return func(s setting[T]) setting[T] { + if !s.Set { + s.Value = val + s.Set = true + } + return s + } +} diff --git a/sdk/log/setting_test.go b/sdk/log/setting_test.go new file mode 100644 index 00000000000..e3cbcc996dd --- /dev/null +++ b/sdk/log/setting_test.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log // import "go.opentelemetry.io/otel/sdk/log" + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewSetting(t *testing.T) { + const val int = 1 + s := newSetting(val) + assert.True(t, s.Set, "returned unset value") + assert.Equal(t, val, s.Value, "value not set") +} + +func TestSettingResolve(t *testing.T) { + t.Run("clearLessThanOne", func(t *testing.T) { + var s setting[int] + s.Value = -10 + s = s.Resolve(clearLessThanOne[int]()) + assert.False(t, s.Set) + assert.Equal(t, 0, s.Value) + + s = newSetting[int](1) + s = s.Resolve(clearLessThanOne[int]()) + assert.True(t, s.Set) + assert.Equal(t, 1, s.Value) + }) + + t.Run("getenv", func(t *testing.T) { + const key = "key" + t.Setenv(key, "10") + + var s setting[int] + s = s.Resolve(getenv[int](key)) + assert.True(t, s.Set) + assert.Equal(t, 10, s.Value) + + t.Setenv(key, "20") + s = s.Resolve(getenv[int](key)) + assert.Equal(t, 10, s.Value, "set setting overridden") + }) + + t.Run("fallback", func(t *testing.T) { + var s setting[int] + s = s.Resolve(fallback[int](10)) + assert.True(t, s.Set) + assert.Equal(t, 10, s.Value) + }) + + t.Run("Precedence", func(t *testing.T) { + const key = "key" + + var s setting[int] + s = s.Resolve( + clearLessThanOne[int](), + getenv[int](key), // Unset. + fallback[int](10), + ) + assert.True(t, s.Set) + assert.Equal(t, 10, s.Value) + + t.Setenv(key, "20") + s = s.Resolve( + clearLessThanOne[int](), + getenv[int](key), // Should not apply, already set. + fallback[int](15), // Should not apply, already set. + ) + assert.True(t, s.Set) + assert.Equal(t, 10, s.Value) + + s = setting[int]{} + s = s.Resolve( + getenv[int](key), + fallback[int](15), // Should not apply, already set. + ) + assert.True(t, s.Set) + assert.Equal(t, 20, s.Value) + }) +}