Skip to content

Commit

Permalink
Cherry-pick elastic#17653 to 7.x: Reduce dependencies in Crawler (ela…
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored Apr 23, 2020
1 parent 682445b commit 7731ffa
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 41 deletions.
57 changes: 33 additions & 24 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,19 @@ import (
"fmt"
"sync"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/filebeat/registrar"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

type crawler struct {
inputs map[uint64]*input.Runner
log *logp.Logger
inputs map[uint64]cfgfile.Runner
inputConfigs []*common.Config
out channel.Factory
wg sync.WaitGroup
inputsFactory cfgfile.RunnerFactory
modulesFactory cfgfile.RunnerFactory
Expand All @@ -46,14 +45,13 @@ type crawler struct {

func newCrawler(
inputFactory, module cfgfile.RunnerFactory,
out channel.Factory,
inputConfigs []*common.Config,
beatDone chan struct{},
once bool,
) (*crawler, error) {
return &crawler{
out: out,
inputs: map[uint64]*input.Runner{},
log: logp.NewLogger("crawler"),
inputs: map[uint64]cfgfile.Runner{},
inputsFactory: inputFactory,
modulesFactory: module,
inputConfigs: inputConfigs,
Expand All @@ -65,16 +63,16 @@ func newCrawler(
// Start starts the crawler with all inputs
func (c *crawler) Start(
pipeline beat.Pipeline,
r *registrar.Registrar,
configInputs *common.Config,
configModules *common.Config,
) error {
log := c.log

logp.Info("Loading Inputs: %v", len(c.inputConfigs))
log.Infof("Loading Inputs: %v", len(c.inputConfigs))

// Prospect the globs/paths given on the command line and launch harvesters
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig, r.GetStates())
err := c.startInput(pipeline, inputConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -102,34 +100,41 @@ func (c *crawler) Start(
}()
}

logp.Info("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs))
log.Infof("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs))

return nil
}

func (c *crawler) startInput(
pipeline beat.Pipeline,
config *common.Config,
states []file.State,
) error {
if !config.Enabled() {
return nil
}

connector := c.out(pipeline)
p, err := input.New(config, connector, c.beatDone, states, nil)
var h map[string]interface{}
config.Unpack(&h)
id, err := hashstructure.Hash(h, nil)
if err != nil {
return fmt.Errorf("Error while initializing input: %s", err)
return fmt.Errorf("can not compute id from configuration: %v", err)
}
if _, ok := c.inputs[id]; ok {
return fmt.Errorf("input with same ID already exists: %v", id)
}
p.Once = c.once

if _, ok := c.inputs[p.ID]; ok {
return fmt.Errorf("Input with same ID already exists: %d", p.ID)
runner, err := c.inputsFactory.Create(pipeline, config, nil)
if err != nil {
return fmt.Errorf("Error while initializing input: %+v", err)
}
if inputRunner, ok := runner.(*input.Runner); ok {
inputRunner.Once = c.once
}

c.inputs[p.ID] = p
c.inputs[id] = runner

p.Start()
c.log.Info("Starting input (ID: %d)", id)
runner.Start()

return nil
}
Expand All @@ -146,9 +151,13 @@ func (c *crawler) Stop() {
}

logp.Info("Stopping %v inputs", len(c.inputs))
for _, p := range c.inputs {
// Stop inputs in parallel
asyncWaitStop(p.Stop)
// Stop inputs in parallel
for id, p := range c.inputs {
id, p := id, p
asyncWaitStop(func() {
c.log.Infof("Stopping input: %d", id)
p.Stop()
})
}

if c.inputReloader != nil {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
fb.done,
)

crawler, err := newCrawler(inputLoader, moduleLoader, pipelineConnector, config.Inputs, fb.done, *once)
crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand Down Expand Up @@ -378,7 +378,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
logp.Debug("modules", "Existing Ingest pipelines will be updated")
}

err = crawler.Start(b.Publisher, registrar, config.ConfigInput, config.ConfigModules)
err = crawler.Start(b.Publisher, config.ConfigInput, config.ConfigModules)
if err != nil {
crawler.Stop()
return err
Expand Down
15 changes: 1 addition & 14 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"sync"
"time"

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -52,7 +50,6 @@ type Runner struct {
input Input
done chan struct{}
wg *sync.WaitGroup
ID uint64
Once bool
beatDone chan struct{}
}
Expand All @@ -78,13 +75,6 @@ func New(
return nil, err
}

var h map[string]interface{}
conf.Unpack(&h)
input.ID, err = hashstructure.Hash(h, nil)
if err != nil {
return nil, err
}

var f Factory
f, err = GetFactory(input.config.Type)
if err != nil {
Expand All @@ -111,7 +101,6 @@ func New(
// Start starts the input
func (p *Runner) Start() {
p.wg.Add(1)
logp.Info("Starting input of type: %v; ID: %d ", p.config.Type, p.ID)

onceWg := sync.WaitGroup{}
if p.Once {
Expand Down Expand Up @@ -164,8 +153,6 @@ func (p *Runner) Stop() {
}

func (p *Runner) stop() {
logp.Info("Stopping Input: %d", p.ID)

// In case of once, it will be waited until harvesters close itself
if p.Once {
p.input.Wait()
Expand All @@ -175,5 +162,5 @@ func (p *Runner) stop() {
}

func (p *Runner) String() string {
return fmt.Sprintf("input [type=%s, ID=%d]", p.config.Type, p.ID)
return fmt.Sprintf("input [type=%s]", p.config.Type)
}
2 changes: 1 addition & 1 deletion filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ def test_restart_state_reset(self):
# Wait until inputs are started
self.wait_until(
lambda: self.log_contains_count(
"Starting input of type: log", logfile="filebeat2.log") >= 1,
"Starting input", logfile="filebeat2.log") >= 1,
max_timeout=10)

filebeat.check_kill_and_wait()
Expand Down

0 comments on commit 7731ffa

Please sign in to comment.