Skip to content

Load configuration from modules.d when running setup subcommand #12340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Skipping unparsable log entries from docker json reader {pull}12268[12268]
- Require client_auth by default when ssl is enabled for tcp input {pull}12333[12333]
- Require certificate authorities, certificate file, and key when SSL is enabled for the TCP input. {pull}12355[12355]
- Load correct pipelines when system module is configured in modules.d. {pull}12340[12340]

*Heartbeat*

Expand Down
30 changes: 20 additions & 10 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,30 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return fb, nil
}

// setupPipelineLoaderCallback sets the callback function for loading pipelines during setup.
func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
if !fb.moduleRegistry.Empty() {
overwritePipelines := fb.config.OverwritePipelines
if b.InSetupCmd {
overwritePipelines = true
if b.Config.Output.Name() != "elasticsearch" {
logp.Warn(pipelinesWarning)
return nil
}

overwritePipelines := true
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return err
}

b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return err
}
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
// When running the subcommand setup, configuration from modules.d directories
// have to be loaded using cfg.Reloader. Otherwise those configurations are skipped.
pipelineLoaderFactory := newPipelineLoaderFactory(b.Config.Output.Config())
modulesFactory := fileset.NewSetupFactory(b.Info.Version, pipelineLoaderFactory)
if fb.config.ConfigModules.Enabled() {
modulesLoader := cfgfile.NewReloader(b.Publisher, fb.config.ConfigModules)
modulesLoader.Load(modulesFactory)
}

return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
return nil
}
Expand Down
85 changes: 85 additions & 0 deletions filebeat/fileset/setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package fileset

import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

// SetupFactory is for loading module assets when running setup subcommand.
type SetupFactory struct {
beatVersion string
pipelineLoaderFactory PipelineLoaderFactory
overwritePipelines bool
}

// NewSetupFactory creates a SetupFactory
func NewSetupFactory(beatVersion string, pipelineLoaderFactory PipelineLoaderFactory) *SetupFactory {
return &SetupFactory{
beatVersion: beatVersion,
pipelineLoaderFactory: pipelineLoaderFactory,
overwritePipelines: true,
}
}

// Create creates a new SetupCfgRunner to setup module configuration.
func (sf *SetupFactory) Create(_ beat.Pipeline, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) {
m, err := NewModuleRegistry([]*common.Config{c}, sf.beatVersion, false)
if err != nil {
return nil, err
}

return &SetupCfgRunner{
moduleRegistry: m,
pipelineLoaderFactory: sf.pipelineLoaderFactory,
overwritePipelines: sf.overwritePipelines,
}, nil
}

// SetupCfgRunner is for loading assets of modules.
type SetupCfgRunner struct {
moduleRegistry *ModuleRegistry
pipelineLoaderFactory PipelineLoaderFactory
overwritePipelines bool
}

// Start loads module pipelines for configured modules.
func (sr *SetupCfgRunner) Start() {
logp.Debug("fileset", "Loading ingest pipelines for modules from modules.d")
pipelineLoader, err := sr.pipelineLoaderFactory()
if err != nil {
logp.Err("Error loading pipeline: %+v", err)
return
}

err = sr.moduleRegistry.LoadPipelines(pipelineLoader, sr.overwritePipelines)
if err != nil {
logp.Err("Error loading pipeline: %s", err)
}
}

// Stopp of SetupCfgRunner.
func (sr *SetupCfgRunner) Stop() {}

// String returns information on the Runner
func (sr *SetupCfgRunner) String() string {
return sr.moduleRegistry.InfoString()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- module: template-test-module
# All logs
test:
enabled: true
var.parse_time: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
:modulename: template-test-module
:has-dashboards: true

== template-test-module module

This is the template-test-module module.

include::../include/what-happens.asciidoc[]

[float]
=== Compatibility

TODO: document with what versions of the software is this tested


include::../include/running-modules.asciidoc[]

[float]
=== Example dashboard

This module comes with a sample dashboard. For example:

TODO: include an image of a sample dashboard. If you do not include a dashboard,
remove this section and set `:has-dashboards: false` at the top of this file.

include::../include/configuring-intro.asciidoc[]

TODO: provide an example configuration

:fileset_ex: {fileset}

include::../include/config-option-intro.asciidoc[]

TODO: document the variables from each fileset. If you're describing a variable
that's common to other modules, you can reuse shared descriptions by including
the relevant file. For example:

[float]
==== `{fileset}` log fileset settings

include::../include/var-paths.asciidoc[]

:has-dashboards!:

:fileset_ex!:

:modulename!:
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- key: template-test-module
title: "template-test-module"
description: >
template-test-module Module
fields:
- name: template-test-module
type: group
description: >
fields:
3 changes: 3 additions & 0 deletions filebeat/tests/system/input/template-test-module/module.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dashboards:
- id: Filebeat-template-test-module-test-Dashboard
file: Filebeat-template-test-module-test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: log
paths:
{{ range $i, $path := .paths }}
- {{$path}}
{{ end }}
exclude_files: [".gz$"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"description": "Pipeline for testing template directives in pipelines",
"processors": [
{
{<if .parse_time >}
"date": {
"field": "field_to_parse",
"target_field": "@timestamp",
"formats": ["EEE MMM dd H:m:s yyyy", "EEE MMM dd H:m:s.SSSSSS yyyy"],
"ignore_failure": true
}
},
{< end >}
{
"remove": {
"field": "field_to_remove",
"ignore_failure": true
}
}
]
}
13 changes: 13 additions & 0 deletions filebeat/tests/system/input/template-test-module/test/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module_version: 1.0

var:
- name: paths
default:
- /example/test.log*
os.darwin:
- /usr/local/example/test.log*
os.windows:
- c:/programdata/example/logs/test.log*

ingest_pipeline: ingest/pipeline.json
input: config/test.yml
63 changes: 63 additions & 0 deletions filebeat/tests/system/test_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import unittest
import os
import yaml
from shutil import copytree, copyfile

from elasticsearch import Elasticsearch

from filebeat import BaseTest

INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False)


class Test(BaseTest):

def init(self):
self.elasticsearch_url = self.get_elasticsearch_url()
print("Using elasticsearch: {}".format(self.elasticsearch_url))
self.es = Elasticsearch([self.elasticsearch_url])

@unittest.skipIf(not INTEGRATION_TESTS,
"integration tests are disabled, run with INTEGRATION_TESTS=1 to enable them.")
@unittest.skipIf(os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_setup_modules_d_config(self):
"""
Check if template settings are applied to Ingest pipelines when configured from modules.d.
"""
self.init()
self.render_config_template(
modules=True,
elasticsearch={
"host": self.get_elasticsearch_url(),
},
)

self._setup_dummy_module()

beat_setup_modules_pipelines = self.start_beat(
extra_args=[
"setup",
"--pipelines",
"-E", "filebeat.config.modules.path=" + self.working_dir + "/modules.d/*.yml",
],
)
beat_setup_modules_pipelines.check_wait(exit_code=0)

version = self.get_beat_version()
pipeline_name = "filebeat-" + version + "-template-test-module-test-pipeline"
pipeline = self.es.transport.perform_request("GET", "/_ingest/pipeline/" + pipeline_name)

assert "date" in pipeline[pipeline_name]["processors"][0]
assert "remove" in pipeline[pipeline_name]["processors"][1]

def _setup_dummy_module(self):
modules_d_path = self.working_dir + "/modules.d"
modules_path = self.working_dir + "/module"

for directory in [modules_d_path, modules_path]:
if not os.path.isdir(directory):
os.mkdir(directory)

copytree(self.beat_path + "/tests/system/input/template-test-module", modules_path + "/template-test-module")
copyfile(self.beat_path + "/tests/system/input/template-test-module/_meta/config.yml", modules_d_path + "/test.yml")
31 changes: 31 additions & 0 deletions libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,37 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
}
}

// Load loads configuration files once.
func (rl *Reloader) Load(runnerFactory RunnerFactory) {
list := NewRunnerList("load", runnerFactory, rl.pipeline)

rl.wg.Add(1)
defer rl.wg.Done()

// Stop all running modules when method finishes
defer list.Stop()

gw := NewGlobWatcher(rl.path)

debugf("Scan for config files")
files, _, err := gw.Scan()
if err != nil {
logp.Err("Error fetching new config files: %v", err)
}

// Load all config objects
configs, _ := rl.loadConfigs(files)

debugf("Number of module configs found: %v", len(configs))

if err := list.Reload(configs); err != nil {
logp.Err("Error loading configuration files: %+v", err)
return
}

logp.Info("Loading of config files completed.")
}

func (rl *Reloader) loadConfigs(files []string) ([]*reload.ConfigWithMeta, error) {
// Load all config objects
result := []*reload.ConfigWithMeta{}
Expand Down
16 changes: 14 additions & 2 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ def start_beat(self,
output=None,
logging_args=["-e", "-v", "-d", "*"],
extra_args=[],
env={}):
env={},
home=""):
"""
Starts beat and returns the process handle. The
caller is responsible for stopping / waiting for the
Expand All @@ -203,8 +204,13 @@ def start_beat(self,
"-test.coverprofile",
os.path.join(self.working_dir, "coverage.cov"),
]

path_home = os.path.normpath(self.working_dir)
if home:
path_home = home

args += [
"-path.home", os.path.normpath(self.working_dir),
"-path.home", path_home,
"-c", os.path.join(self.working_dir, config),
]

Expand Down Expand Up @@ -681,3 +687,9 @@ def is_documented(key, docs):
raise Exception("Key '{}' found in event is not documented!".format(key))
if is_documented(key, aliases):
raise Exception("Key '{}' found in event is documented as an alias!".format(key))

def get_beat_version(self):
proc = self.start_beat(extra_args=["version"], output="version")
proc.wait()

return self.get_log_lines(logfile="version")[0].split()[2]