Skip to content

Commit

Permalink
[exporter] [chore] Initialize batchSender and queueSender after confi…
Browse files Browse the repository at this point in the history
…guration (#11041)

#### Description

This PR changes initialization of `batchSender` and `queueSender` to
AFTER configuration. That way we get to access `queueConfig` and
`batcherConfig` in the same place.

Context: This is some pre-work for changing queue->batch from a pushing
model to a pulling model. We will be initialization a
`queueBatchSender(queueConfig, batcherConfig)` if both queue and batcher
are enabled and initialize `batchSender(batchConfig)` if only batcher is
enabled. This change enables us to achieve the goal without changing
config API.

#### Link to tracking issue

#10368

#### Testing

Ran `opentelemetry-collector$ make` to make sure all tests still pass.

Co-authored-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>
  • Loading branch information
sfc-gh-sili and mx-psi authored Sep 6, 2024
1 parent d19ebe4 commit 7966035
Showing 1 changed file with 35 additions and 19 deletions.
54 changes: 35 additions & 19 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,8 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
set := exporterqueue.Settings{
DataType: o.signal,
ExporterSettings: o.set,
}
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep)
o.queueCfg = cfg
o.queueFactory = queueFactory
return nil
}
}
Expand Down Expand Up @@ -172,20 +169,8 @@ func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf expor
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option {
return func(o *baseExporter) error {
if !cfg.Enabled {
return nil
}

bs := newBatchSender(cfg, o.set, o.batchMergeFunc, o.batchMergeSplitfunc)
for _, opt := range opts {
if err := opt(bs); err != nil {
return err
}
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
return fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters")
}
o.batchSender = bs
o.batcherCfg = cfg
o.batcherOpts = opts
return nil
}
}
Expand Down Expand Up @@ -247,6 +232,11 @@ type baseExporter struct {
timeoutSender *timeoutSender // timeoutSender is always initialized.

consumerOptions []consumer.Option

queueCfg exporterqueue.Config
queueFactory exporterqueue.Factory[Request]
batcherCfg exporterbatcher.Config
batcherOpts []BatcherOption
}

func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
Expand Down Expand Up @@ -275,6 +265,32 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre
return nil, err
}

if be.batcherCfg.Enabled {
bs := newBatchSender(be.batcherCfg, be.set, be.batchMergeFunc, be.batchMergeSplitfunc)
for _, opt := range be.batcherOpts {
err = multierr.Append(err, opt(bs))
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
}
be.batchSender = bs
}

if be.queueCfg.Enabled {
set := exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}
be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.queueCfg), be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
for _, op := range options {
err = multierr.Append(err, op(be))
}
}

if err != nil {
return nil, err
}

be.connectSenders()

if bs, ok := be.batchSender.(*batchSender); ok {
Expand Down

0 comments on commit 7966035

Please sign in to comment.