Skip to content

Commit

Permalink
[exporter] Disable the API to pass in configurations using a callback…
Browse files Browse the repository at this point in the history
… that operates on batch sender (open-telemetry#11448)

#### Description

As part of the effort to solve
open-telemetry#10368,
we no longer guarantee to initialize a `batchSender` when `batcher` is
enabled. Therefore, we would like to remove the interface to set
`mergeFunc` and `mergeSplitFunc` as a callback that operates on
`batchSender`. Instead, users should use the alternative
`WithBatchFuncs` that is a callback that operates `baseExporter`.

Context:
open-telemetry#11414

#### Link to tracking issue

open-telemetry#8122
open-telemetry#10368

---------

Co-authored-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
2 people authored and HongChenTW committed Dec 19, 2024
1 parent e0a33b5 commit 148275f
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 69 deletions.
28 changes: 28 additions & 0 deletions .chloggen/disable-batch-option.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Disables setting batch option to batch sender directly.

# One or more tracking issues or pull requests related to the change
issues: [10368]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
Removed WithRequestBatchFuncs(BatcherOption) in favor of WithBatchFuncs(Option), where |
BatcherOption is a function that operates on batch sender and Option is one that operates |
on BaseExporter

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
20 changes: 10 additions & 10 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
return internal.WithCapabilities(capabilities)
}

// BatcherOption apply changes to batcher sender.
type BatcherOption = internal.BatcherOption

// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types.
func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) BatcherOption {
return internal.WithRequestBatchFuncs(mf, msf)
}

// WithBatcher enables batching for an exporter based on custom request types.
// For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and
// WithRequestBatchFuncs provided.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option {
return internal.WithBatcher(cfg, opts...)
func WithBatcher(cfg exporterbatcher.Config) Option {
return internal.WithBatcher(cfg)
}

// WithBatchFuncs enables setting custom batch merge functions.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request],
msf exporterbatcher.BatchMergeSplitFunc[Request]) Option {
return internal.WithBatchFuncs(mf, msf)
}
25 changes: 1 addition & 24 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender
// Option apply changes to BaseExporter.
type Option func(*BaseExporter) error

// BatcherOption apply changes to batcher sender.
type BatcherOption func(*BatchSender) error

type BaseExporter struct {
component.StartFunc
component.ShutdownFunc
Expand Down Expand Up @@ -64,7 +61,6 @@ type BaseExporter struct {
queueCfg exporterqueue.Config
queueFactory exporterqueue.Factory[internal.Request]
BatcherCfg exporterbatcher.Config
BatcherOpts []BatcherOption
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
Expand Down Expand Up @@ -109,9 +105,6 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe

if be.BatcherCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc)
for _, opt := range be.BatcherOpts {
err = multierr.Append(err, opt(bs))
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
}
Expand Down Expand Up @@ -275,30 +268,14 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
}
}

// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types.
func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) BatcherOption {
return func(bs *BatchSender) error {
if mf == nil || msf == nil {
return fmt.Errorf("WithRequestBatchFuncs must be provided with non-nil functions")
}
if bs.mergeFunc != nil || bs.mergeSplitFunc != nil {
return fmt.Errorf("WithRequestBatchFuncs can only be used once with request-based exporters")
}
bs.mergeFunc = mf
bs.mergeSplitFunc = msf
return nil
}
}

// WithBatcher enables batching for an exporter based on custom request types.
// For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and
// WithRequestBatchFuncs provided.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option {
func WithBatcher(cfg exporterbatcher.Config) Option {
return func(o *BaseExporter) error {
o.BatcherCfg = cfg
o.BatcherOpts = opts
return nil
}
}
Expand Down
71 changes: 36 additions & 35 deletions exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ func TestBatchSender_Merge(t *testing.T) {
}{
{
name: "split_disabled",
batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)),
batcherOption: WithBatcher(cfg),
},
{
name: "split_high_limit",
batcherOption: func() Option {
c := cfg
c.MaxSizeItems = 1000
return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))
return WithBatcher(c)
}(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
be := queueBatchExporter(t, tt.batcherOption)
be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand Down Expand Up @@ -94,30 +94,30 @@ func TestBatchSender_BatchExportError(t *testing.T) {
}{
{
name: "merge_only",
batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)),
batcherOption: WithBatcher(cfg),
},
{
name: "merge_without_split_triggered",
batcherOption: func() Option {
c := cfg
c.MaxSizeItems = 200
return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))
return WithBatcher(c)
}(),
},
{
name: "merge_with_split_triggered",
batcherOption: func() Option {
c := cfg
c.MaxSizeItems = 20
return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))
return WithBatcher(c)
}(),
expectedRequests: 1,
expectedItems: 20,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
be := queueBatchExporter(t, tt.batcherOption)
be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
cfg.MinSizeItems = 5
cfg.MaxSizeItems = 10
cfg.FlushTimeout = 100 * time.Millisecond
be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
func TestBatchSender_Shutdown(t *testing.T) {
batchCfg := exporterbatcher.NewDefaultConfig()
batchCfg.MinSizeItems = 10
be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
be := queueBatchExporter(t, WithBatcher(batchCfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

Expand All @@ -212,7 +212,8 @@ func TestBatchSender_Disabled(t *testing.T) {
cfg.Enabled = false
cfg.MaxSizeItems = 5
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(cfg))
require.NotNil(t, be)
require.NoError(t, err)

Expand Down Expand Up @@ -241,7 +242,7 @@ func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.FlushTimeout = 50 * time.Millisecond
cfg.MaxSizeItems = 20
be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc)))
be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
Expand All @@ -260,8 +261,8 @@ func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) {

func TestBatchSender_PostShutdown(t *testing.T) {
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc,
fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(exporterbatcher.NewDefaultConfig()))
require.NotNil(t, be)
require.NoError(t, err)
assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -322,7 +323,8 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
qCfg := exporterqueue.NewDefaultConfig()
qCfg.NumConsumers = 2
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)),
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(tt.batcherCfg),
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()))
require.NotNil(t, be)
require.NoError(t, err)
Expand Down Expand Up @@ -377,7 +379,8 @@ func TestBatchSender_BatchBlocking(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 3
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NotNil(t, be)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -407,7 +410,8 @@ func TestBatchSender_BatchCancelled(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 2
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NotNil(t, be)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -442,7 +446,8 @@ func TestBatchSender_DrainActiveRequests(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 2
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NotNil(t, be)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -487,18 +492,9 @@ func TestBatchSender_WithBatcherOption(t *testing.T) {
opts: []Option{WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())},
expectedErr: false,
},
{
name: "funcs_set_twice",
opts: []Option{
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc,
fakeBatchMergeSplitFunc)),
},
expectedErr: true,
},
{
name: "nil_funcs",
opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))},
opts: []Option{WithBatchFuncs(nil, nil), WithBatcher(exporterbatcher.NewDefaultConfig())},
expectedErr: true,
},
}
Expand All @@ -518,7 +514,8 @@ func TestBatchSender_WithBatcherOption(t *testing.T) {

func TestBatchSender_UnstartedShutdown(t *testing.T) {
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(exporterbatcher.NewDefaultConfig()))
require.NoError(t, err)

err = be.Shutdown(context.Background())
Expand All @@ -542,7 +539,8 @@ func TestBatchSender_ShutdownDeadlock(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -578,7 +576,8 @@ func TestBatchSenderWithTimeout(t *testing.T) {
tCfg := NewDefaultTimeoutConfig()
tCfg.Timeout = 50 * time.Millisecond
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)),
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg),
WithTimeout(tCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -637,7 +636,8 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) {
bCfg.MinSizeItems = 8
bCfg.FlushTimeout = 50 * time.Millisecond
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
sink := newFakeRequestSink()
Expand Down Expand Up @@ -668,7 +668,8 @@ func TestBatchSenderTimerFlush(t *testing.T) {
bCfg.MinSizeItems = 8
bCfg.FlushTimeout = 100 * time.Millisecond
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc),
WithBatcher(bCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
sink := newFakeRequestSink()
Expand Down Expand Up @@ -703,9 +704,9 @@ func TestBatchSenderTimerFlush(t *testing.T) {
require.NoError(t, be.Shutdown(context.Background()))
}

func queueBatchExporter(t *testing.T, batchOption Option) *BaseExporter {
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, batchOption,
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]()))
func queueBatchExporter(t *testing.T, opts ...Option) *BaseExporter {
opts = append(opts, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]()))
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, opts...)
require.NotNil(t, be)
require.NoError(t, err)
return be
Expand Down

0 comments on commit 148275f

Please sign in to comment.