-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add filebeat modules config reloading #4566
Changes from 1 commit
5d9cc6c
094e6bf
acba788
9ba3d40
798f41b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |
"sync" | ||
|
||
"github.com/elastic/beats/filebeat/channel" | ||
"github.com/elastic/beats/filebeat/fileset" | ||
"github.com/elastic/beats/filebeat/input/file" | ||
"github.com/elastic/beats/filebeat/prospector" | ||
"github.com/elastic/beats/filebeat/registrar" | ||
|
@@ -20,22 +21,24 @@ type Crawler struct { | |
wg sync.WaitGroup | ||
reloader *cfgfile.Reloader | ||
once bool | ||
beatVersion string | ||
beatDone chan struct{} | ||
} | ||
|
||
func New(out channel.Outleter, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) { | ||
func New(out channel.Outleter, prospectorConfigs []*common.Config, beatVersion string, beatDone chan struct{}, once bool) (*Crawler, error) { | ||
|
||
return &Crawler{ | ||
out: out, | ||
prospectors: map[uint64]*prospector.Prospector{}, | ||
prospectorConfigs: prospectorConfigs, | ||
once: once, | ||
beatVersion: beatVersion, | ||
beatDone: beatDone, | ||
}, nil | ||
} | ||
|
||
// Start starts the crawler with all prospectors | ||
func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config) error { | ||
func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config, configModules *common.Config) error { | ||
|
||
logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs)) | ||
|
||
|
@@ -57,6 +60,17 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config | |
}() | ||
} | ||
|
||
if configModules.Enabled() { | ||
logp.Beta("Loading separate prospectors is enabled.") | ||
|
||
c.reloader = cfgfile.NewReloader(configModules) | ||
// TODO add beatVersion here | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see you pass it above, so you could add it now? |
||
factory := fileset.NewFactory(c.out, r, "", c.beatDone) | ||
go func() { | ||
c.reloader.Run(factory) | ||
}() | ||
} | ||
|
||
logp.Info("Loading and starting Prospectors completed. Enabled prospectors: %v", len(c.prospectors)) | ||
|
||
return nil | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package fileset | ||
|
||
import ( | ||
"github.com/elastic/beats/libbeat/cfgfile" | ||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/elastic/beats/libbeat/logp" | ||
"github.com/mitchellh/hashstructure" | ||
|
||
"github.com/elastic/beats/filebeat/channel" | ||
"github.com/elastic/beats/filebeat/prospector" | ||
"github.com/elastic/beats/filebeat/registrar" | ||
) | ||
|
||
// Factory is a factory for registrars | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this really a factory of registrars? Or did you mean a factory of modules or prospectors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uhm, this was copied from here: https://github.com/elastic/beats/blob/master/filebeat/prospector/factory.go#L11, I'll better put factory of modules |
||
type Factory struct { | ||
outlet channel.Outleter | ||
registrar *registrar.Registrar | ||
beatVersion string | ||
beatDone chan struct{} | ||
} | ||
|
||
// Wrap an array of prospectors and implements cfgfile.Runner interface | ||
type prospectorsRunner struct { | ||
id uint64 | ||
prospectors []*prospector.Prospector | ||
} | ||
|
||
// NewFactory instantiates a new Factory | ||
func NewFactory(outlet channel.Outleter, registrar *registrar.Registrar, beatVersion string, beatDone chan struct{}) *Factory { | ||
return &Factory{ | ||
outlet: outlet, | ||
registrar: registrar, | ||
beatVersion: beatVersion, | ||
beatDone: beatDone, | ||
} | ||
} | ||
|
||
// Create creates a module based on a config | ||
func (f *Factory) Create(c *common.Config) (cfgfile.Runner, error) { | ||
// Start a registry of one module: | ||
m, err := NewModuleRegistry([]*common.Config{c}, f.beatVersion) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
pConfigs, err := m.GetProspectorConfigs() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Hash module ID | ||
var h map[string]interface{} | ||
c.Unpack(&h) | ||
id, err := hashstructure.Hash(h, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
prospectors := make([]*prospector.Prospector, len(pConfigs)) | ||
for i, pConfig := range pConfigs { | ||
prospectors[i], err = prospector.NewProspector(pConfig, f.outlet, f.beatDone, f.registrar.GetStates()) | ||
if err != nil { | ||
logp.Err("Error creating prospector: %s", err) | ||
return nil, err | ||
} | ||
} | ||
|
||
return &prospectorsRunner{ | ||
id: id, | ||
prospectors: prospectors, | ||
}, nil | ||
} | ||
|
||
func (p *prospectorsRunner) Start() { | ||
for _, prospector := range p.prospectors { | ||
prospector.Start() | ||
} | ||
} | ||
func (p *prospectorsRunner) Stop() { | ||
for _, prospector := range p.prospectors { | ||
prospector.Stop() | ||
} | ||
} | ||
func (p *prospectorsRunner) ID() uint64 { | ||
return p.id | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
type: log | ||
paths: | ||
{{ range $i, $path := .paths }} | ||
- {{$path}} | ||
{{ end }} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
{} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
module_version: "1.0" | ||
|
||
var: | ||
- name: paths | ||
default: | ||
- test.log | ||
|
||
ingest_pipeline: ingest/pipeline.json | ||
prospector: config/test.yml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably mark this as
beta
, since the config reloading in metricbeat is also beta.