Skip to content

Commit

Permalink
[chore] move logstransform processor to generated lifecycle tests (op…
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme authored and cparkins committed Jan 10, 2024
1 parent 34c95dd commit b0cd650
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 9 deletions.
108 changes: 108 additions & 0 deletions processor/logstransformprocessor/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion processor/logstransformprocessor/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,10 @@ status:
development: [logs]
distributions: [observiq, splunk, sumo]
codeowners:
active: [djaglowski, dehaansa]
active: [djaglowski, dehaansa]

tests:
config:
operators:
- type: regex_parser
regex: '^(?P<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
28 changes: 20 additions & 8 deletions processor/logstransformprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type logsTransformProcessor struct {
converter *adapter.Converter
fromConverter *adapter.FromPdataConverter
wg sync.WaitGroup
shutdownFns []component.ShutdownFunc
}

func newProcessor(config *Config, nextConsumer consumer.Logs, logger *zap.Logger) (*logsTransformProcessor, error) {
Expand Down Expand Up @@ -62,14 +63,15 @@ func (ltp *logsTransformProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

func (ltp *logsTransformProcessor) Shutdown(_ context.Context) error {
ltp.logger.Info("Stopping logs transform processor")
pipelineErr := ltp.pipe.Stop()
ltp.converter.Stop()
ltp.fromConverter.Stop()
func (ltp *logsTransformProcessor) Shutdown(ctx context.Context) error {
for _, fn := range ltp.shutdownFns {
if err := fn(ctx); err != nil {
return err
}
}
ltp.wg.Wait()

return pipelineErr
return nil
}

func (ltp *logsTransformProcessor) Start(ctx context.Context, _ component.Host) error {
Expand All @@ -79,6 +81,10 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, _ component.Host)
if err != nil {
return err
}
ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
ltp.logger.Info("Stopping logs transform processor")
return ltp.pipe.Stop()
})

pipelineOperators := ltp.pipe.Operators()
if len(pipelineOperators) == 0 {
Expand All @@ -90,10 +96,17 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, _ component.Host)

ltp.converter = adapter.NewConverter(ltp.logger)
ltp.converter.Start()
ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
ltp.converter.Stop()
return nil
})

ltp.fromConverter = adapter.NewFromPdataConverter(wkrCount, ltp.logger)
ltp.fromConverter.Start()

ltp.shutdownFns = append(ltp.shutdownFns, func(ctx context.Context) error {
ltp.fromConverter.Stop()
return nil
})
// Below we're starting 3 loops:
// * first which reads all the logs translated by the fromConverter and then forwards
// them to pipeline
Expand All @@ -112,7 +125,6 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, _ component.Host)
// (aggregated by Resource) and then places them on the next consumer
ltp.wg.Add(1)
go ltp.consumerLoop(ctx)

return nil
}

Expand Down

0 comments on commit b0cd650

Please sign in to comment.