diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index 24d05e023b2..dce35d7b16a 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -28,16 +28,21 @@ type BaseBatcher struct { queue Queue[internal.Request] maxWorkers int workerPool chan bool + exportFunc func(ctx context.Context, req internal.Request) error stopWG sync.WaitGroup } -func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) { +func NewBatcher(batchCfg exporterbatcher.Config, + queue Queue[internal.Request], + exportFunc func(ctx context.Context, req internal.Request) error, + maxWorkers int) (Batcher, error) { if !batchCfg.Enabled { return &DisabledBatcher{ BaseBatcher{ batchCfg: batchCfg, queue: queue, maxWorkers: maxWorkers, + exportFunc: exportFunc, stopWG: sync.WaitGroup{}, }, }, nil @@ -48,6 +53,7 @@ func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], batchCfg: batchCfg, queue: queue, maxWorkers: maxWorkers, + exportFunc: exportFunc, stopWG: sync.WaitGroup{}, }, }, nil @@ -65,7 +71,7 @@ func (qb *BaseBatcher) startWorkerPool() { // flush exports the incoming batch synchronously. func (qb *BaseBatcher) flush(batchToFlush batch) { - err := batchToFlush.req.Export(batchToFlush.ctx) + err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req) for _, idx := range batchToFlush.idxList { qb.queue.OnProcessingFinished(idx, err) } diff --git a/exporter/internal/queue/default_batcher_test.go b/exporter/internal/queue/default_batcher_test.go index 80e4e9f4be6..3909ae02070 100644 --- a/exporter/internal/queue/default_batcher_test.go +++ b/exporter/internal/queue/default_batcher_test.go @@ -50,7 +50,9 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) { Capacity: 10, }) - ba, err := NewBatcher(cfg, q, tt.maxWorkers) + ba, err := NewBatcher(cfg, q, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + tt.maxWorkers) require.NoError(t, err) require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) @@ -108,7 +110,9 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) { Capacity: 10, }) - ba, err := NewBatcher(cfg, q, tt.maxWorkers) + ba, err := NewBatcher(cfg, q, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + tt.maxWorkers) require.NoError(t, err) require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) @@ -172,7 +176,9 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) { Capacity: 10, }) - ba, err := NewBatcher(cfg, q, tt.maxWorkers) + ba, err := NewBatcher(cfg, q, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + tt.maxWorkers) require.NoError(t, err) require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) @@ -236,7 +242,9 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) { Capacity: 10, }) - ba, err := NewBatcher(cfg, q, tt.maxWorkers) + ba, err := NewBatcher(cfg, q, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + tt.maxWorkers) require.NoError(t, err) require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) diff --git a/exporter/internal/queue/disabled_batcher_test.go b/exporter/internal/queue/disabled_batcher_test.go index 036fa961720..ca612d82de5 100644 --- a/exporter/internal/queue/disabled_batcher_test.go +++ b/exporter/internal/queue/disabled_batcher_test.go @@ -46,7 +46,9 @@ func TestDisabledBatcher_Basic(t *testing.T) { Capacity: 10, }) - ba, err := NewBatcher(cfg, q, tt.maxWorkers) + ba, err := NewBatcher(cfg, q, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + tt.maxWorkers) require.NoError(t, err) require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))