Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Update pkg/stanza operators to use component package types #31664

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions pkg/stanza/adapter/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -124,33 +123,32 @@ func (f BenchReceiverType) Type() component.Type {
}

func (f BenchReceiverType) CreateDefaultConfig() component.Config {
bCfg, _ := newDefaultConfig("").(*BenchOpConfig)
return &BenchConfig{
BaseConfig: BaseConfig{
Operators: []operator.Config{},
Operators: []operator.Identifiable{},
},
BenchOpConfig: *NewBenchOpConfig(),
BenchOpConfig: *bCfg,
}
}

func (f BenchReceiverType) BaseConfig(cfg component.Config) BaseConfig {
return cfg.(*BenchConfig).BaseConfig
}

func (f BenchReceiverType) InputConfig(cfg component.Config) operator.Config {
return operator.NewConfig(cfg.(*BenchConfig))
func (f BenchReceiverType) InputConfig(cfg component.Config) operator.Identifiable {
return cfg.(*BenchConfig)
}

func init() {
operator.Register(benchTypeStr, func() operator.Builder { return NewBenchOpConfig() })
operator.RegisterFactory(newBenchOpFactory())
}

// NewBenchOpConfig creates a new benchmarking operator config with default values
func NewBenchOpConfig() *BenchOpConfig {
return NewBenchOpConfigWithID(benchTypeStr)
func newBenchOpFactory() operator.Factory {
return operator.NewFactory(benchType, newDefaultConfig, createOperator)
}

// NewBenchOpConfigWithID creates a new noop operator config with default values
func NewBenchOpConfigWithID(operatorID string) *BenchOpConfig {
func newDefaultConfig(operatorID string) component.Config {
return &BenchOpConfig{
InputConfig: helper.NewInputConfig(operatorID, benchTypeStr),
}
Expand All @@ -163,9 +161,9 @@ type BenchOpConfig struct {
NumHosts int `mapstructure:"num_hosts"`
}

// Build will build a noop operator.
func (c BenchOpConfig) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
func createOperator(set component.TelemetrySettings, cfg component.Config) (operator.Operator, error) {
c := cfg.(*BenchConfig).BenchOpConfig
inputOperator, err := c.InputConfig.Build(set.Logger.Sugar())
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/stanza/adapter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (

// BaseConfig is the common configuration of a stanza-based receiver
type BaseConfig struct {
Operators []operator.Config `mapstructure:"operators"`
StorageID *component.ID `mapstructure:"storage"`
RetryOnFailure consumerretry.Config `mapstructure:"retry_on_failure"`
Operators []operator.Identifiable `mapstructure:"operators"`
StorageID *component.ID `mapstructure:"storage"`
RetryOnFailure consumerretry.Config `mapstructure:"retry_on_failure"`

// currently not configurable by users, but available for benchmarking
numWorkers int
Expand Down
34 changes: 20 additions & 14 deletions pkg/stanza/adapter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
Expand Down Expand Up @@ -61,25 +62,30 @@ func (o flushIntervalOption) apply(e *LogEmitter) {
e.flushInterval = o.flushInterval
}

// NewLogEmitter creates a new receiver output
func NewLogEmitter(logger *zap.SugaredLogger, opts ...emitterOption) *LogEmitter {
func NewEmitter(set component.TelemetrySettings, opts ...emitterOption) (*LogEmitter, error) {
cfg := helper.NewOutputConfig("log_emitter", "log_emitter")
op, err := helper.NewOutputOperator(set, cfg)
if err != nil {
return nil, err
}
e := &LogEmitter{
OutputOperator: helper.OutputOperator{
BasicOperator: helper.BasicOperator{
OperatorID: "log_emitter",
OperatorType: "log_emitter",
SugaredLogger: logger,
},
},
logChan: make(chan []*entry.Entry),
maxBatchSize: defaultMaxBatchSize,
batch: make([]*entry.Entry, 0, defaultMaxBatchSize),
flushInterval: defaultFlushInterval,
cancel: func() {},
OutputOperator: op,
logChan: make(chan []*entry.Entry),
maxBatchSize: defaultMaxBatchSize,
batch: make([]*entry.Entry, 0, defaultMaxBatchSize),
flushInterval: defaultFlushInterval,
cancel: func() {},
}
for _, opt := range opts {
opt.apply(e)
}
return e, nil
}

// Deprecated [v0.98.0] Use NewEmitter instead.
func NewLogEmitter(logger *zap.SugaredLogger, opts ...emitterOption) *LogEmitter {
set := component.TelemetrySettings{Logger: logger.Desugar()}
e, _ := NewEmitter(set, opts...) // nolint: errcheck
return e
}

Expand Down
30 changes: 16 additions & 14 deletions pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type LogReceiverType interface {
Type() component.Type
CreateDefaultConfig() component.Config
BaseConfig(component.Config) BaseConfig
InputConfig(component.Config) operator.Config
InputConfig(component.Config) operator.Identifiable
}

// NewFactory creates a factory for a Stanza-based receiver
Expand All @@ -36,14 +36,14 @@ func NewFactory(logReceiverType LogReceiverType, sl component.StabilityLevel) rc
func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
return func(
ctx context.Context,
params rcvr.CreateSettings,
set rcvr.CreateSettings,
cfg component.Config,
nextConsumer consumer.Logs,
) (rcvr.Logs, error) {
inputCfg := logReceiverType.InputConfig(cfg)
baseCfg := logReceiverType.BaseConfig(cfg)

operators := append([]operator.Config{inputCfg}, baseCfg.Operators...)
operators := append([]operator.Identifiable{inputCfg}, baseCfg.Operators...)

emitterOpts := []emitterOption{}
if baseCfg.maxBatchSize > 0 {
Expand All @@ -52,11 +52,13 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
if baseCfg.flushInterval > 0 {
emitterOpts = append(emitterOpts, withFlushInterval(baseCfg.flushInterval))
}
emitter := NewLogEmitter(params.Logger.Sugar(), emitterOpts...)
pipe, err := pipeline.Config{
Operators: operators,
DefaultOutput: emitter,
}.Build(params.Logger.Sugar())

emitter, err := NewEmitter(set.TelemetrySettings, emitterOpts...)
if err != nil {
return nil, err
}

pipe, err := pipeline.New(set.TelemetrySettings, operators, pipeline.WithDefaultOutput(emitter))
if err != nil {
return nil, err
}
Expand All @@ -65,20 +67,20 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
if baseCfg.numWorkers > 0 {
converterOpts = append(converterOpts, withWorkerCount(baseCfg.numWorkers))
}
converter := NewConverter(params.Logger, converterOpts...)
converter := NewConverter(set.Logger, converterOpts...)
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: params.ID,
ReceiverCreateSettings: params,
ReceiverID: set.ID,
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
return &receiver{
id: params.ID,
set: set.TelemetrySettings,
id: set.ID,
pipe: pipe,
emitter: emitter,
consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, params.Logger, nextConsumer),
logger: params.Logger,
consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, set.Logger, nextConsumer),
converter: converter,
obsrecv: obsrecv,
storageID: baseCfg.StorageID,
Expand Down
12 changes: 2 additions & 10 deletions pkg/stanza/adapter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ func TestCreateReceiver(t *testing.T) {
t.Run("Success", func(t *testing.T) {
factory := NewFactory(TestReceiverType{}, component.StabilityLevelDevelopment)
cfg := factory.CreateDefaultConfig().(*TestConfig)
cfg.Operators = []operator.Config{
{
Builder: json.NewConfig(),
},
}
cfg.Operators = []operator.Identifiable{json.NewConfig()}
receiver, err := factory.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err, "receiver creation failed")
require.NotNil(t, receiver, "receiver creation failed")
Expand All @@ -34,11 +30,7 @@ func TestCreateReceiver(t *testing.T) {
t.Run("DecodeOperatorConfigsFailureMissingFields", func(t *testing.T) {
factory := NewFactory(TestReceiverType{}, component.StabilityLevelDevelopment)
badCfg := factory.CreateDefaultConfig().(*TestConfig)
badCfg.Operators = []operator.Config{
{
Builder: regex.NewConfig(),
},
}
badCfg.Operators = []operator.Identifiable{regex.NewConfig()}
receiver, err := factory.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), badCfg, consumertest.NewNop())
require.Error(t, err, "receiver creation should fail if parser configs aren't valid")
require.Nil(t, receiver, "receiver creation should fail if parser configs aren't valid")
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
}

return &receiver{
set: componenttest.NewNopTelemetrySettings(),
id: component.MustNewID("testReceiver"),
pipe: pipe,
emitter: emitter,
consumer: nextConsumer,
logger: zap.NewNop(),
converter: NewConverter(zap.NewNop()),
obsrecv: obsrecv,
}, nil
Expand Down
32 changes: 17 additions & 15 deletions pkg/stanza/adapter/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ import (
"errors"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/noop"
)

var unstartableType = component.MustNewType("unstartable_operator")

// This file implements some useful testing components
func init() {
operator.Register("unstartable_operator", func() operator.Builder { return NewUnstartableConfig() })
operator.RegisterFactory(operator.NewFactory(unstartableType, newUnstartableConfig, createUnstartableOperator))
}

// UnstartableConfig is the configuration of an unstartable mock operator
Expand All @@ -33,17 +34,18 @@ type UnstartableOperator struct {
helper.OutputOperator
}

// newUnstartableConfig creates new output config
func NewUnstartableConfig() operator.Config {
return operator.NewConfig(&UnstartableConfig{
OutputConfig: helper.NewOutputConfig("unstartable_operator", "unstartable_operator"),
})
func newUnstartableConfig(operatorID string) component.Config {
return &UnstartableConfig{
OutputConfig: helper.NewOutputConfig(operatorID, "unstartable_operator"),
}
}

// Build will build an unstartable operator
func (c *UnstartableConfig) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
o, _ := c.OutputConfig.Build(logger)
return &UnstartableOperator{OutputOperator: o}, nil
func createUnstartableOperator(set component.TelemetrySettings, cfg component.Config) (operator.Operator, error) {
op, err := helper.NewOutputOperator(set, cfg.(*UnstartableConfig).OutputConfig)
if err != nil {
return nil, err
}
return &UnstartableOperator{OutputOperator: op}, nil
}

// Start will return an error
Expand All @@ -62,7 +64,7 @@ var testType = component.MustNewType(testTypeStr)

type TestConfig struct {
BaseConfig `mapstructure:",squash"`
Input operator.Config `mapstructure:",squash"`
Input operator.Identifiable `mapstructure:",squash"`
}
type TestReceiverType struct{}

Expand All @@ -73,16 +75,16 @@ func (f TestReceiverType) Type() component.Type {
func (f TestReceiverType) CreateDefaultConfig() component.Config {
return &TestConfig{
BaseConfig: BaseConfig{
Operators: []operator.Config{},
Operators: []operator.Identifiable{},
},
Input: operator.NewConfig(noop.NewConfig()),
Input: noop.NewFactory().NewDefaultConfig("").(operator.Identifiable),
}
}

func (f TestReceiverType) BaseConfig(cfg component.Config) BaseConfig {
return cfg.(*TestConfig).BaseConfig
}

func (f TestReceiverType) InputConfig(cfg component.Config) operator.Config {
func (f TestReceiverType) InputConfig(cfg component.Config) operator.Identifiable {
return cfg.(*TestConfig).Input
}
16 changes: 8 additions & 8 deletions pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

type receiver struct {
set component.TelemetrySettings
id component.ID
wg sync.WaitGroup
cancel context.CancelFunc
Expand All @@ -28,7 +29,6 @@ type receiver struct {
emitter *LogEmitter
consumer consumer.Logs
converter *Converter
logger *zap.Logger
obsrecv *receiverhelper.ObsReport

storageID *component.ID
Expand All @@ -42,7 +42,7 @@ var _ rcvr.Logs = (*receiver)(nil)
func (r *receiver) Start(ctx context.Context, host component.Host) error {
rctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.logger.Info("Starting stanza receiver")
r.set.Logger.Info("Starting stanza receiver")

if err := r.setStorageClient(ctx, host); err != nil {
return fmt.Errorf("storage client: %w", err)
Expand Down Expand Up @@ -87,7 +87,7 @@ func (r *receiver) emitterLoop(ctx context.Context) {
for {
select {
case <-doneChan:
r.logger.Debug("Receive loop stopped")
r.set.Logger.Debug("Receive loop stopped")
return

case e, ok := <-r.emitter.logChan:
Expand All @@ -96,7 +96,7 @@ func (r *receiver) emitterLoop(ctx context.Context) {
}

if err := r.converter.Batch(e); err != nil {
r.logger.Error("Could not add entry to batch", zap.Error(err))
r.set.Logger.Error("Could not add entry to batch", zap.Error(err))
}
}
}
Expand All @@ -112,19 +112,19 @@ func (r *receiver) consumerLoop(ctx context.Context) {
for {
select {
case <-doneChan:
r.logger.Debug("Consumer loop stopped")
r.set.Logger.Debug("Consumer loop stopped")
return

case pLogs, ok := <-pLogsChan:
if !ok {
r.logger.Debug("Converter channel got closed")
r.set.Logger.Debug("Converter channel got closed")
continue
}
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.logger.Error("ConsumeLogs() failed", zap.Error(cErr))
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
}
Expand All @@ -137,7 +137,7 @@ func (r *receiver) Shutdown(ctx context.Context) error {
return nil
}

r.logger.Info("Stopping stanza receiver")
r.set.Logger.Info("Stopping stanza receiver")
pipelineErr := r.pipe.Stop()
r.converter.Stop()
r.cancel()
Expand Down
Loading