Skip to content

Commit

Permalink
Implement the BatchingProcessor configuration (#5088)
Browse files Browse the repository at this point in the history
* Implement the batching config

* Unify on setting type

* Add setting_test.go

* Test NewBatchingProcessor

* Comment setting

* Fix lint

* Check invalid after envar

---------

Co-authored-by: Sam Xie <sam@samxie.me>
  • Loading branch information
MrAlias and XSAM committed Mar 25, 2024
1 parent 8b14747 commit 32e3a3d
Show file tree
Hide file tree
Showing 6 changed files with 424 additions and 79 deletions.
111 changes: 89 additions & 22 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,49 @@ import (
"time"
)

const (
dfltMaxQSize = 2048
dfltExpInterval = time.Second
dfltExpTimeout = 30 * time.Second
dfltExpMaxBatchSize = 512

envarMaxQSize = "OTEL_BLRP_MAX_QUEUE_SIZE"
envarExpInterval = "OTEL_BLRP_SCHEDULE_DELAY"
envarExpTimeout = "OTEL_BLRP_EXPORT_TIMEOUT"
envarExpMaxBatchSize = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
)

// Compile-time check BatchingProcessor implements Processor.
var _ Processor = (*BatchingProcessor)(nil)

// BatchingProcessor is an processor that asynchronously exports batches of log records.
type BatchingProcessor struct{}
// BatchingProcessor is a processor that exports batches of log records.
type BatchingProcessor struct {
exporter Exporter

type batcherConfig struct{}
maxQueueSize int
exportInterval time.Duration
exportTimeout time.Duration
exportMaxBatchSize int
}

// NewBatchingProcessor decorates the provided exporter
// so that the log records are batched before exporting.
//
// All of the exporter's methods are called from a single dedicated
// background goroutine. Therefore, the exporter does not need to
// be concurrent safe.
// All of the exporter's methods are called synchronously.
func NewBatchingProcessor(exporter Exporter, opts ...BatchingOption) *BatchingProcessor {
// TODO (#5063): Implement.
return nil
if exporter == nil {
// Do not panic on nil export.
exporter = defaultNoopExporter
}
cfg := newBatchingConfig(opts)
return &BatchingProcessor{
exporter: exporter,

maxQueueSize: cfg.maxQSize.Value,
exportInterval: cfg.expInterval.Value,
exportTimeout: cfg.expTimeout.Value,
exportMaxBatchSize: cfg.expMaxBatchSize.Value,
}
}

// OnEmit batches provided log record.
Expand All @@ -50,14 +76,55 @@ func (b *BatchingProcessor) ForceFlush(ctx context.Context) error {
return nil
}

// BatchingOption applies a configuration to a BatchingProcessor.
type batchingConfig struct {
maxQSize setting[int]
expInterval setting[time.Duration]
expTimeout setting[time.Duration]
expMaxBatchSize setting[int]
}

func newBatchingConfig(options []BatchingOption) batchingConfig {
var c batchingConfig
for _, o := range options {
c = o.apply(c)
}

c.maxQSize = c.maxQSize.Resolve(
clearLessThanOne[int](),
getenv[int](envarMaxQSize),
clearLessThanOne[int](),
fallback[int](dfltMaxQSize),
)
c.expInterval = c.expInterval.Resolve(
clearLessThanOne[time.Duration](),
getenv[time.Duration](envarExpInterval),
clearLessThanOne[time.Duration](),
fallback[time.Duration](dfltExpInterval),
)
c.expTimeout = c.expTimeout.Resolve(
clearLessThanOne[time.Duration](),
getenv[time.Duration](envarExpTimeout),
clearLessThanOne[time.Duration](),
fallback[time.Duration](dfltExpTimeout),
)
c.expMaxBatchSize = c.expMaxBatchSize.Resolve(
clearLessThanOne[int](),
getenv[int](envarExpMaxBatchSize),
clearLessThanOne[int](),
fallback[int](dfltExpMaxBatchSize),
)

return c
}

// BatchingOption applies a configuration to a [BatchingProcessor].
type BatchingOption interface {
apply(batcherConfig) batcherConfig
apply(batchingConfig) batchingConfig
}

type batchingOptionFunc func(batcherConfig) batcherConfig
type batchingOptionFunc func(batchingConfig) batchingConfig

func (fn batchingOptionFunc) apply(c batcherConfig) batcherConfig {
func (fn batchingOptionFunc) apply(c batchingConfig) batchingConfig {
return fn(c)
}

Expand All @@ -70,9 +137,9 @@ func (fn batchingOptionFunc) apply(c batcherConfig) batcherConfig {
// By default, if an environment variable is not set, and this option is not
// passed, 2048 will be used.
// The default value is also used when the provided value is less than one.
func WithMaxQueueSize(max int) BatchingOption {
return batchingOptionFunc(func(cfg batcherConfig) batcherConfig {
// TODO (#5063): Implement.
func WithMaxQueueSize(size int) BatchingOption {
return batchingOptionFunc(func(cfg batchingConfig) batchingConfig {
cfg.maxQSize = newSetting(size)
return cfg
})
}
Expand All @@ -86,8 +153,8 @@ func WithMaxQueueSize(max int) BatchingOption {
// passed, 1s will be used.
// The default value is also used when the provided value is less than one.
func WithExportInterval(d time.Duration) BatchingOption {
return batchingOptionFunc(func(cfg batcherConfig) batcherConfig {
// TODO (#5063): Implement.
return batchingOptionFunc(func(cfg batchingConfig) batchingConfig {
cfg.expInterval = newSetting(d)
return cfg
})
}
Expand All @@ -101,8 +168,8 @@ func WithExportInterval(d time.Duration) BatchingOption {
// passed, 30s will be used.
// The default value is also used when the provided value is less than one.
func WithExportTimeout(d time.Duration) BatchingOption {
return batchingOptionFunc(func(cfg batcherConfig) batcherConfig {
// TODO (#5063): Implement.
return batchingOptionFunc(func(cfg batchingConfig) batchingConfig {
cfg.expTimeout = newSetting(d)
return cfg
})
}
Expand All @@ -116,9 +183,9 @@ func WithExportTimeout(d time.Duration) BatchingOption {
// By default, if an environment variable is not set, and this option is not
// passed, 512 will be used.
// The default value is also used when the provided value is less than one.
func WithExportMaxBatchSize(max int) BatchingOption {
return batchingOptionFunc(func(cfg batcherConfig) batcherConfig {
// TODO (#5063): Implement.
func WithExportMaxBatchSize(size int) BatchingOption {
return batchingOptionFunc(func(cfg batchingConfig) batchingConfig {
cfg.expMaxBatchSize = newSetting(size)
return cfg
})
}
134 changes: 134 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel"
)

func TestNewBatchingProcessorConfiguration(t *testing.T) {
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
t.Log(err)
}))

testcases := []struct {
name string
envars map[string]string
options []BatchingOption
want *BatchingProcessor
}{
{
name: "Defaults",
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: dfltMaxQSize,
exportInterval: dfltExpInterval,
exportTimeout: dfltExpTimeout,
exportMaxBatchSize: dfltExpMaxBatchSize,
},
},
{
name: "Options",
options: []BatchingOption{
WithMaxQueueSize(1),
WithExportInterval(time.Microsecond),
WithExportTimeout(time.Hour),
WithExportMaxBatchSize(2),
},
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: 1,
exportInterval: time.Microsecond,
exportTimeout: time.Hour,
exportMaxBatchSize: 2,
},
},
{
name: "Environment",
envars: map[string]string{
envarMaxQSize: strconv.Itoa(1),
envarExpInterval: strconv.Itoa(100),
envarExpTimeout: strconv.Itoa(1000),
envarExpMaxBatchSize: strconv.Itoa(10),
},
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: 1,
exportInterval: 100 * time.Millisecond,
exportTimeout: 1000 * time.Millisecond,
exportMaxBatchSize: 10,
},
},
{
name: "InvalidOptions",
options: []BatchingOption{
WithMaxQueueSize(-11),
WithExportInterval(-1 * time.Microsecond),
WithExportTimeout(-1 * time.Hour),
WithExportMaxBatchSize(-2),
},
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: dfltMaxQSize,
exportInterval: dfltExpInterval,
exportTimeout: dfltExpTimeout,
exportMaxBatchSize: dfltExpMaxBatchSize,
},
},
{
name: "InvalidEnvironment",
envars: map[string]string{
envarMaxQSize: "-1",
envarExpInterval: "-1",
envarExpTimeout: "-1",
envarExpMaxBatchSize: "-1",
},
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: dfltMaxQSize,
exportInterval: dfltExpInterval,
exportTimeout: dfltExpTimeout,
exportMaxBatchSize: dfltExpMaxBatchSize,
},
},
{
name: "Precedence",
envars: map[string]string{
envarMaxQSize: strconv.Itoa(1),
envarExpInterval: strconv.Itoa(100),
envarExpTimeout: strconv.Itoa(1000),
envarExpMaxBatchSize: strconv.Itoa(10),
},
options: []BatchingOption{
// These override the environment variables.
WithMaxQueueSize(3),
WithExportInterval(time.Microsecond),
WithExportTimeout(time.Hour),
WithExportMaxBatchSize(2),
},
want: &BatchingProcessor{
exporter: defaultNoopExporter,
maxQueueSize: 3,
exportInterval: time.Microsecond,
exportTimeout: time.Hour,
exportMaxBatchSize: 2,
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
for key, value := range tc.envars {
t.Setenv(key, value)
}
assert.Equal(t, tc.want, NewBatchingProcessor(nil, tc.options...))
})
}
}
Loading

0 comments on commit 32e3a3d

Please sign in to comment.