Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the BatchingProcessor configuration #5088

Merged
merged 13 commits into from
Mar 25, 2024
111 changes: 89 additions & 22 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,49 @@ import (
"time"
)

const (
dfltMaxQSize = 2048
dfltExpInterval = time.Second
dfltExpTimeout = 30 * time.Second
dfltExpMaxBatchSize = 512

envarMaxQSize = "OTEL_BLRP_MAX_QUEUE_SIZE"
envarExpInterval = "OTEL_BLRP_SCHEDULE_DELAY"
envarExpTimeout = "OTEL_BLRP_EXPORT_TIMEOUT"
envarExpMaxBatchSize = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
)

// Compile-time check BatchingProcessor implements Processor.
var _ Processor = (*BatchingProcessor)(nil)

// BatchingProcessor is an processor that asynchronously exports batches of log records.
type BatchingProcessor struct{}
// BatchingProcessor is a processor that exports batches of log records.
type BatchingProcessor struct {
exporter Exporter

type batcherConfig struct{}
maxQueueSize int
exportInterval time.Duration
exportTimeout time.Duration
exportMaxBatchSize int
}

// NewBatchingProcessor decorates the provided exporter
// so that the log records are batched before exporting.
//
// All of the exporter's methods are called from a single dedicated
// background goroutine. Therefore, the exporter does not need to
// be concurrent safe.
// All of the exporter's methods are called synchronously.
func NewBatchingProcessor(exporter Exporter, opts ...BatchingOption) *BatchingProcessor {
// TODO (#5063): Implement.
return nil
if exporter == nil {
// Do not panic on nil export.
exporter = defaultNoopExporter
}
cfg := newBatchingConfig(opts)
return &BatchingProcessor{
exporter: exporter,

maxQueueSize: cfg.maxQSize.Value,
exportInterval: cfg.expInterval.Value,
exportTimeout: cfg.expTimeout.Value,
exportMaxBatchSize: cfg.expMaxBatchSize.Value,
}
}

// OnEmit batches provided log record.
Expand All @@ -50,14 +76,55 @@ func (b *BatchingProcessor) ForceFlush(ctx context.Context) error {
return nil
}

// BatchingOption applies a configuration to a BatchingProcessor.
type batchingConfig struct {
maxQSize setting[int]
expInterval setting[time.Duration]
expTimeout setting[time.Duration]
expMaxBatchSize setting[int]
}

func newBatchingConfig(options []BatchingOption) batchingConfig {
var c batchingConfig
for _, o := range options {
c = o.apply(c)
}

c.maxQSize = c.maxQSize.Resolve(
clearLessThanOne[int](),
getenv[int](envarMaxQSize),
clearLessThanOne[int](),
fallback[int](dfltMaxQSize),
)
c.expInterval = c.expInterval.Resolve(
clearLessThanOne[time.Duration](),
getenv[time.Duration](envarExpInterval),
clearLessThanOne[time.Duration](),
fallback[time.Duration](dfltExpInterval),
)
c.expTimeout = c.expTimeout.Resolve(
clearLessThanOne[time.Duration](),
getenv[time.Duration](envarExpTimeout),
clearLessThanOne[time.Duration](),
fallback[time.Duration](dfltExpTimeout),
)
c.expMaxBatchSize = c.expMaxBatchSize.Resolve(
clearLessThanOne[int](),
getenv[int](envarExpMaxBatchSize),
clearLessThanOne[int](),
fallback[int](dfltExpMaxBatchSize),
)

return c
}

// BatchingOption applies a configuration to a [BatchingProcessor].
type BatchingOption interface {
apply(batcherConfig) batcherConfig
apply(batchingConfig) batchingConfig
}

type batchingOptionFunc func(batcherConfig) batcherConfig
type batchingOptionFunc func(batchingConfig) batchingConfig

func (fn batchingOptionFunc) apply(c batcherConfig) batcherConfig {
func (fn batchingOptionFunc) apply(c batchingConfig) batchingConfig {
return fn(c)
}

Expand All @@ -70,9 +137,9 @@ func (fn batchingOptionFunc) apply(c batcherConfig) batcherConfig {
// By default, if an environment variable is not set, and this option is not
// passed, 2048 will be used.
// The default value is also used when the provided value is less than one.
func WithMaxQueueSize(max int) BatchingOption {
return batchingOptionFunc(func(cfg batcherConfig) batcherConfig {
// TODO (#5063): Implement.
func WithMaxQueueSize(size int) BatchingOption {
return batchingOptionFunc(func(cfg batchingConfig) batchingConfig {
cfg.maxQSize = newSetting(size)
return cfg
})
}
Expand All @@ -86,8 +153,8 @@ func WithMaxQueueSize(max int) BatchingOption {
// passed, 1s will be used.
// The default value is also used when the provided value is less than one.
func WithExportInterval(d time.Duration) BatchingOption {
return batchingOptionFunc(func(cfg batcherConfig) batcherConfig {
// TODO (#5063): Implement.
return batchingOptionFunc(func(cfg batchingConfig) batchingConfig {
cfg.expInterval = newSetting(d)
return cfg
})
}
Expand All @@ -101,8 +168,8 @@ func WithExportInterval(d time.Duration) BatchingOption {
// passed, 30s will be used.
// The default value is also used when the provided value is less than one.
func WithExportTimeout(d time.Duration) BatchingOption {
return batchingOptionFunc(func(cfg batcherConfig) batcherConfig {
// TODO (#5063): Implement.
return batchingOptionFunc(func(cfg batchingConfig) batchingConfig {
cfg.expTimeout = newSetting(d)
return cfg
})
}
Expand All @@ -116,9 +183,9 @@ func WithExportTimeout(d time.Duration) BatchingOption {
// By default, if an environment variable is not set, and this option is not
// passed, 512 will be used.
// The default value is also used when the provided value is less than one.
func WithExportMaxBatchSize(max int) BatchingOption {
return batchingOptionFunc(func(cfg batcherConfig) batcherConfig {
// TODO (#5063): Implement.
func WithExportMaxBatchSize(size int) BatchingOption {
return batchingOptionFunc(func(cfg batchingConfig) batchingConfig {
cfg.expMaxBatchSize = newSetting(size)
return cfg
})
}
134 changes: 134 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel"
)

func TestNewBatchingProcessorConfiguration(t *testing.T) {
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
t.Log(err)
}))

testcases := []struct {
name string
envars map[string]string
options []BatchingOption
want *BatchingProcessor
}{
{
name: "Defaults",
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: dfltMaxQSize,
exportInterval: dfltExpInterval,
exportTimeout: dfltExpTimeout,
exportMaxBatchSize: dfltExpMaxBatchSize,
},
},
{
name: "Options",
options: []BatchingOption{
WithMaxQueueSize(1),
WithExportInterval(time.Microsecond),
WithExportTimeout(time.Hour),
WithExportMaxBatchSize(2),
},
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: 1,
exportInterval: time.Microsecond,
exportTimeout: time.Hour,
exportMaxBatchSize: 2,
},
},
{
name: "Environment",
envars: map[string]string{
envarMaxQSize: strconv.Itoa(1),
envarExpInterval: strconv.Itoa(100),
envarExpTimeout: strconv.Itoa(1000),
envarExpMaxBatchSize: strconv.Itoa(10),
},
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: 1,
exportInterval: 100 * time.Millisecond,
exportTimeout: 1000 * time.Millisecond,
exportMaxBatchSize: 10,
},
},
{
name: "InvalidOptions",
options: []BatchingOption{
WithMaxQueueSize(-11),
WithExportInterval(-1 * time.Microsecond),
WithExportTimeout(-1 * time.Hour),
WithExportMaxBatchSize(-2),
},
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: dfltMaxQSize,
exportInterval: dfltExpInterval,
exportTimeout: dfltExpTimeout,
exportMaxBatchSize: dfltExpMaxBatchSize,
},
},
{
name: "InvalidEnvironment",
envars: map[string]string{
envarMaxQSize: "-1",
envarExpInterval: "-1",
envarExpTimeout: "-1",
envarExpMaxBatchSize: "-1",
},
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: dfltMaxQSize,
exportInterval: dfltExpInterval,
exportTimeout: dfltExpTimeout,
exportMaxBatchSize: dfltExpMaxBatchSize,
},
},
{
name: "Precedence",
envars: map[string]string{
envarMaxQSize: strconv.Itoa(1),
envarExpInterval: strconv.Itoa(100),
envarExpTimeout: strconv.Itoa(1000),
envarExpMaxBatchSize: strconv.Itoa(10),
},
options: []BatchingOption{
// These override the environment variables.
WithMaxQueueSize(3),
WithExportInterval(time.Microsecond),
WithExportTimeout(time.Hour),
WithExportMaxBatchSize(2),
},
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: 3,
exportInterval: time.Microsecond,
exportTimeout: time.Hour,
exportMaxBatchSize: 2,
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
for key, value := range tc.envars {
t.Setenv(key, value)
}
assert.Equal(t, tc.want, NewBatchingProcessor(nil, tc.options...))
})
}
}
Loading