Skip to content

Commit

Permalink
Add filebeat modules config reloading (#4566)
Browse files Browse the repository at this point in the history
* Add filebeat modules reloading config

* Pass beat version to module factory

* Mark reloading as beta

* Avoid repeating message when modules/prospectors reload is enabled

* Load pipelines on new module instances
  • Loading branch information
exekias authored and tsg committed Jul 5, 2017
1 parent a0f55eb commit 63c6784
Show file tree
Hide file tree
Showing 14 changed files with 388 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d
- Add support for loading Xpack Machine Learning configurations from the modules, and added sample configurations for the Nginx module. {pull}4506[4506]
- Add udp prospector type. {pull}4452[4452]
- Enabled Cgo which means libc is dynamically compiled. {pull}4546[4546]
- Add Beta module config reloading mechanism {pull}4566[4566]

*Heartbeat*

Expand Down
16 changes: 11 additions & 5 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,14 @@ filebeat.prospectors:
#filebeat.shutdown_timeout: 0

# Enable filebeat config reloading
#filebeat.config.prospectors:
#enabled: false
#path: configs/*.yml
#reload.enabled: true
#reload.period: 10s
#filebeat.config:
#prospectors:
#enabled: false
#path: prospectors.d/*.yml
#reload.enabled: true
#reload.period: 10s
#modules:
#enabled: false
#path: modules.d/*.yml
#reload.enabled: true
#reload.period: 10s
30 changes: 22 additions & 8 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
_ "github.com/elastic/beats/filebeat/processor/add_kubernetes_metadata"
)

const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines for the configured" +
" modules because the Elasticsearch output is not configured/enabled. If you have" +
" already loaded the Ingest Node pipelines or are using Logstash pipelines, you" +
" can ignore this warning."

var (
once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")
)
Expand Down Expand Up @@ -67,7 +72,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
}
}

if !config.ConfigProspector.Enabled() && !haveEnabledProspectors {
if !config.ConfigProspector.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledProspectors {
if !b.InSetupCmd {
return nil, errors.New("No modules or prospectors enabled and configuration reloading disabled. What files do you want me to watch?")
} else {
Expand All @@ -76,7 +81,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
}
}

if *once && config.ConfigProspector.Enabled() {
if *once && config.ConfigProspector.Enabled() && config.ConfigModules.Enabled() {
return nil, errors.New("prospector configs and -once cannot be used together")
}

Expand All @@ -99,10 +104,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// setup.
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
if b.Config.Output.Name() != "elasticsearch" {
logp.Warn("Filebeat is unable to load the Ingest Node pipelines for the configured" +
" modules because the Elasticsearch output is not configured/enabled. If you have" +
" already loaded the Ingest Node pipelines or are using Logstash pipelines, you" +
" can ignore this warning.")
logp.Warn(pipelinesWarning)
return nil
}

Expand Down Expand Up @@ -176,7 +178,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

crawler, err := crawler.New(channel.NewOutlet(fb.done, spooler.Channel, wgEvents), config.Prospectors, fb.done, *once)
outlet := channel.NewOutlet(fb.done, spooler.Channel, wgEvents)
crawler, err := crawler.New(outlet, config.Prospectors, b.Info.Version, fb.done, *once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand Down Expand Up @@ -218,7 +221,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
spooler.Stop()
}()

err = crawler.Start(registrar, config.ConfigProspector)
var esClient fileset.PipelineLoader
if b.Config.Output.Name() == "elasticsearch" {
esConfig := b.Config.Output.Config()
esClient, err = elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return errors.Wrap(err, "Error creating Elasticsearch client")
}
} else {
logp.Warn(pipelinesWarning)
}

err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, esClient)
if err != nil {
crawler.Stop()
return err
Expand Down
1 change: 1 addition & 0 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
ConfigProspector *common.Config `config:"config.prospectors"`
ConfigModules *common.Config `config:"config.modules"`
}

var (
Expand Down
17 changes: 15 additions & 2 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/fileset"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
Expand All @@ -20,22 +21,24 @@ type Crawler struct {
wg sync.WaitGroup
reloader *cfgfile.Reloader
once bool
beatVersion string
beatDone chan struct{}
}

func New(out channel.Outleter, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) {
func New(out channel.Outleter, prospectorConfigs []*common.Config, beatVersion string, beatDone chan struct{}, once bool) (*Crawler, error) {

return &Crawler{
out: out,
prospectors: map[uint64]*prospector.Prospector{},
prospectorConfigs: prospectorConfigs,
once: once,
beatVersion: beatVersion,
beatDone: beatDone,
}, nil
}

// Start starts the crawler with all prospectors
func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config) error {
func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config, configModules *common.Config, pipelineLoader fileset.PipelineLoader) error {

logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs))

Expand All @@ -57,6 +60,16 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config
}()
}

if configModules.Enabled() {
logp.Beta("Loading separate modules is enabled.")

c.reloader = cfgfile.NewReloader(configModules)
factory := fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoader, c.beatDone)
go func() {
c.reloader.Run(factory)
}()
}

logp.Info("Loading and starting Prospectors completed. Enabled prospectors: %v", len(c.prospectors))

return nil
Expand Down
16 changes: 11 additions & 5 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,17 @@ filebeat.prospectors:
#filebeat.shutdown_timeout: 0

# Enable filebeat config reloading
#filebeat.config.prospectors:
#enabled: false
#path: configs/*.yml
#reload.enabled: true
#reload.period: 10s
#filebeat.config:
#prospectors:
#enabled: false
#path: prospectors.d/*.yml
#reload.enabled: true
#reload.period: 10s
#modules:
#enabled: false
#path: modules.d/*.yml
#reload.enabled: true
#reload.period: 10s

#================================ General ======================================

Expand Down
108 changes: 108 additions & 0 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package fileset

import (
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
)

// Factory for modules
type Factory struct {
outlet channel.Outleter
registrar *registrar.Registrar
beatVersion string
pipelineLoader PipelineLoader
beatDone chan struct{}
}

// Wrap an array of prospectors and implements cfgfile.Runner interface
type prospectorsRunner struct {
id uint64
moduleRegistry *ModuleRegistry
prospectors []*prospector.Prospector
pipelineLoader PipelineLoader
}

// NewFactory instantiates a new Factory
func NewFactory(outlet channel.Outleter, registrar *registrar.Registrar, beatVersion string, pipelineLoader PipelineLoader, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
beatVersion: beatVersion,
beatDone: beatDone,
pipelineLoader: pipelineLoader,
}
}

// Create creates a module based on a config
func (f *Factory) Create(c *common.Config) (cfgfile.Runner, error) {
// Start a registry of one module:
m, err := NewModuleRegistry([]*common.Config{c}, f.beatVersion)
if err != nil {
return nil, err
}

pConfigs, err := m.GetProspectorConfigs()
if err != nil {
return nil, err
}

// Hash module ID
var h map[string]interface{}
c.Unpack(&h)
id, err := hashstructure.Hash(h, nil)
if err != nil {
return nil, err
}

prospectors := make([]*prospector.Prospector, len(pConfigs))
for i, pConfig := range pConfigs {
prospectors[i], err = prospector.NewProspector(pConfig, f.outlet, f.beatDone, f.registrar.GetStates())
if err != nil {
logp.Err("Error creating prospector: %s", err)
return nil, err
}
}

return &prospectorsRunner{
id: id,
moduleRegistry: m,
prospectors: prospectors,
pipelineLoader: f.pipelineLoader,
}, nil
}

func (p *prospectorsRunner) Start() {
// Load pipelines
if p.pipelineLoader != nil {
// Setup a callback & load now too, as we are already connected
callback := func(esClient *elasticsearch.Client) error {
return p.moduleRegistry.LoadPipelines(p.pipelineLoader)
}
elasticsearch.RegisterConnectCallback(callback)

err := p.moduleRegistry.LoadPipelines(p.pipelineLoader)
if err != nil {
// Log error and continue
logp.Err("Error loading pipeline: %s", err)
}
}

for _, prospector := range p.prospectors {
prospector.Start()
}
}
func (p *prospectorsRunner) Stop() {
for _, prospector := range p.prospectors {
prospector.Stop()
}
}
func (p *prospectorsRunner) ID() uint64 {
return p.id
}
2 changes: 1 addition & 1 deletion filebeat/prospector/log/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (p *Prospector) loadStates(states []file.State) error {
}
}

logp.Info("Prospector with previous states loaded: %v", p.states.Count())
logp.Debug("prospector", "Prospector with previous states loaded: %v", p.states.Count())
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("reg
filebeat.publish_async: {{publish_async}}

{% if reload or reload_path -%}
filebeat.config.prospectors:
filebeat.config.{{ reload_type|default("prospectors") }}:
enabled: true
path: {{ reload_path }}
{% if reload -%}
Expand Down Expand Up @@ -164,8 +164,12 @@ processors:
#------------------------------- Elasticsearch output ----------------------------
output.elasticsearch:
hosts: ["{{ elasticsearch.host }}"]
{% if elasticsearch.pipeline %}
pipeline: {{elasticsearch.pipeline}}
{% endif %}
{% if elasticsearch.index %}
index: {{elasticsearch.index}}
{% endif %}
{%- elif logstash %}
#------------------------------- Logstash output ---------------------------------
output.logstash:
Expand Down
5 changes: 5 additions & 0 deletions filebeat/tests/system/module/test/test/config/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: log
paths:
{{ range $i, $path := .paths }}
- {{$path}}
{{ end }}
8 changes: 8 additions & 0 deletions filebeat/tests/system/module/test/test/ingest/default.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"description": "Test pipeline.",
"processors": [{
"remove":{
"field": "message"
}
}]
}
9 changes: 9 additions & 0 deletions filebeat/tests/system/module/test/test/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module_version: "1.0"

var:
- name: paths
default:
- test.log

ingest_pipeline: ingest/default.json
prospector: config/test.yml
Loading

0 comments on commit 63c6784

Please sign in to comment.