diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 744f93a8b96c..16ef5cbadc0e 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -18,6 +18,7 @@ https://github.com/elastic/beats/compare/v1.1.2...master[Check the HEAD diff] - Run function to start a beat no returns an error instead of directly exiting. {pull}771[771] - Move event preprocessor applying GeoIP to packetbeat {pull}772[772] - Add include_fields and drop_fields as part of generic filtering {pull}1120[1120] +- The method signature of HandleFlags() was changed to allow returning an error {pull}1249[1249] *Packetbeat* - Rename output fields in the dns package. Former flag `recursion_allowed` becomes `recursion_available`. {pull}803[803] @@ -83,6 +84,7 @@ https://github.com/elastic/beats/compare/v1.1.2...master[Check the HEAD diff] - Ensure proper shutdown of libbeat. {pull}1075[1075] - Add `fields` and `fields_under_root` options under the `shipper` configuration {pull}1092[1092] - Add the ability to use a SOCKS5 proxy with the Logstash output {issue}823[823] +- The `-configtest` flag will now print "Config OK" to stdout on success {pull}1249[1249] *Packetbeat* - Change the DNS library used throughout the dns package to github.com/miekg/dns. {pull}803[803] diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 42f90122d1e3..a19827e92e5a 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/logp" cfg "github.com/elastic/beats/filebeat/config" @@ -32,7 +31,7 @@ func New() *Filebeat { func (fb *Filebeat) Config(b *beat.Beat) error { // Load Base config - err := cfgfile.Read(&fb.FbConfig, "") + err := b.RawConfig.Unpack(&fb.FbConfig) if err != nil { return fmt.Errorf("Error reading config file: %v", err) diff --git a/filebeat/tests/system/test_prospector.py b/filebeat/tests/system/test_prospector.py index 608e39bf6a75..c3f12a05b328 100644 --- a/filebeat/tests/system/test_prospector.py +++ b/filebeat/tests/system/test_prospector.py @@ -250,7 +250,7 @@ def test_shutdown_no_prospectors(self): self.wait_until( lambda: self.log_contains( - "shutting down"), + "Exiting"), max_timeout=10) filebeat.check_kill_and_wait(exit_code=1) @@ -273,7 +273,7 @@ def test_no_paths_defined(self): self.wait_until( lambda: self.log_contains( - "shutting down"), + "Exiting"), max_timeout=10) filebeat.check_kill_and_wait(exit_code=1) diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 17c45a31ea40..15822d6be497 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -1,84 +1,105 @@ /* +Package beat provides the functions required to manage the life-cycle of a Beat. +It provides the standard mechanism for launching a Beat. It manages +configuration, logging, and publisher initialization and registers a signal +handler to gracefully stop the process. -Package beat provides the basic environment for each beat. +Each Beat implementation must implement the Beater interface and may optionally +implement the FlagsHandler interface. See the Beater interface documentation for +more details. -Each beat implementation has to implement the beater interface. +To use this package, create a simple main that invokes the Run() function. + func main() { + if err := beat.Run("mybeat", myVersion, beater.New()); err != nil { + os.Exit(1) + } + } -# Start / Stop / Exit a Beat +In the example above, the beater package contains the implementation of the +Beater interface and the New() method returns a new instance of Beater. The +Beater implementation is placed into its own package so that it can be reused +or combined with other Beats. -A beat is start by calling the Run(name string, version string, bt Beater) function and passing the beater object. -This will create new beat and will Start the beat in its own go process. The Run function is blocked until -the Beat.exit channel is closed. This can be done through calling Beat.Exit(). This happens for example when CTRL-C -is pressed. - -A beat can be stopped and started again through beat.Stop and beat.Start. When starting a beat again, it is important to -run it again in it's own go process. To allow a beat to be properly reastarted, it is important that Beater.Stop() properly -closes all channels and go processes. - -In case a beat should not run as a long running process, the beater implementation must make sure to call Beat.Exit() -when the task is completed to stop the beat. +Recommendations + * Use the logp package for logging rather than writing to stdout or stderr. + * Do not call os.Exit in any of your code. Return an error instead. Or if your + code needs to exit without an error, return beat.GracefulExit. */ package beat import ( "flag" "fmt" + "os" "runtime" - "sync" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" - "github.com/elastic/beats/libbeat/service" + svc "github.com/elastic/beats/libbeat/service" "github.com/satori/go.uuid" ) -// Beater interface that every beat must use +var ( + printVersion = flag.Bool("version", false, "Print the version and exit") +) + +var debugf = logp.MakeDebug("beat") + +// Beater is the interface that must be implemented every Beat. The full +// lifecycle of a Beat instance is managed through this interface. +// +// Life-cycle of Beater +// +// The four operational methods are always invoked serially in the following +// order: +// +// Config -> Setup -> Run -> Cleanup +// +// The Stop() method is invoked the first time (and only the first time) a +// shutdown signal is received. The Stop() method is eligible to be invoked +// at any point after Setup() completes (this ensures that the Beater +// implementation is fully initialized before Stop() can be invoked). +// +// The Cleanup() method is guaranteed to be invoked upon shutdown iff the Beater +// reaches the Setup stage. For example, if there is a failure in the +// Config stage then Cleanup will not be invoked. type Beater interface { - Config(*Beat) error - Setup(*Beat) error - Run(*Beat) error - Cleanup(*Beat) error - Stop() + Config(*Beat) error // Read and validate configuration. + Setup(*Beat) error // Initialize the Beat. + Run(*Beat) error // The main event loop. This method should block until signalled to stop by an invocation of the Stop() method. + Cleanup(*Beat) error // Cleanup is invoked to perform any final clean-up prior to exiting. + Stop() // Stop is invoked to signal that the Run method should finish its execution. It will be invoked at most once. } -// FlagsHandler (optional) Beater extension for -// handling flags input on startup. The HandleFlags callback will -// be called after parsing the command line arguments and handling -// the '--help' or '--version' flags. +// FlagsHandler is an interface that can optionally be implemented by a Beat +// if it needs to process command line flags on startup. If implemented, the +// HandleFlags method will be invoked after parsing the command line flags +// and before any of the Beater interface methods are invoked. There will be +// no callback when '-help' or '-version' are specified. type FlagsHandler interface { - HandleFlags(*Beat) + HandleFlags(*Beat) error // Handle any custom command line arguments. } -// Beat struct contains the basic beat information +// Beat contains the basic beat data and the publisher client used to publish +// events. type Beat struct { - Name string - Version string - Config *BeatConfig - BT Beater - Publisher *publisher.PublisherType - Events publisher.Client - UUID uuid.UUID - - exit chan struct{} - error error - state int8 - stateMutex sync.Mutex - callback sync.Once + Name string // Beat name. + Version string // Beat version number. Defaults to the libbeat version when an implementation does not set a version. + UUID uuid.UUID // ID assigned to a Beat instance. + BT Beater // Beater implementation. + RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data. + Config BeatConfig // Common Beat configuration data. + Events publisher.Client // Client used for publishing events. + Publisher *publisher.PublisherType // Publisher + + filters *filter.FilterList // Filters } -// Defaults for config variables which are not set -const ( - StopState = 0 - ConfigState = 1 - SetupState = 2 - RunState = 3 -) - // BeatConfig struct contains the basic configuration of every beat type BeatConfig struct { Output map[string]*common.Config @@ -87,246 +108,193 @@ type BeatConfig struct { Filter []filter.FilterConfig } -var printVersion *bool +// Run initializes and runs a Beater implementation. name is the name of the +// Beat (e.g. packetbeat or topbeat). version is version number of the Beater +// implementation. bt is Beater implementation to run. +func Run(name, version string, bt Beater) error { + return newInstance(name, version, bt).launch(true) +} -// Channel that is closed as soon as the beat should exit -func init() { - printVersion = flag.Bool("version", false, "Print version and exit") +// instance contains everything related to a single instance of a beat. +type instance struct { + data *Beat + beater Beater } -// NewBeat initiates a new beat object -func NewBeat(name string, version string, bt Beater) *Beat { +// newInstance creates and initializes a new Beat instance. +func newInstance(name string, version string, bt Beater) *instance { if version == "" { version = defaultBeatVersion } - b := Beat{ - Version: version, - Name: name, - BT: bt, - UUID: uuid.NewV4(), - - exit: make(chan struct{}), - state: StopState, - } - - return &b -} - -// Run initiates and runs a new beat object -func Run(name string, version string, bt Beater) error { - - b := NewBeat(name, version, bt) - - // Runs beat inside a go process - go func() { - err := b.Start() - - if err != nil { - // TODO: detect if logging was already fully setup or not - fmt.Printf("Start error: %v\n", err) - logp.Critical("Start error: %v", err) - b.error = err - } - - // If start finishes, exit has to be called. This requires start to be blocking - // which is currently the default. - b.Exit() - }() - - // Waits until beats channel is closed - select { - case <-b.exit: - b.Stop() - logp.Info("Exit beat completed") - return b.error - } -} -// Start starts the Beat by parsing and interpreting the command line flags, -// loading and parsing the configuration file, and running the Beat. This -// method blocks until the Beat exits. If an error occurs while initializing -// or running the Beat it will be returned. -func (b *Beat) Start() error { - // Additional command line args are used to overwrite config options - err, exit := b.CommandLineSetup() - if err != nil { - return fmt.Errorf("fails to load command line setup: %v\n", err) - } - - if exit { - return nil + return &instance{ + data: &Beat{ + Name: name, + Version: version, + UUID: uuid.NewV4(), + BT: bt, + }, + beater: bt, } - - // Loads base config - err = b.LoadConfig() - if err != nil { - return fmt.Errorf("fails to load the config: %v\n", err) - } - - // Configures beat - err = b.BT.Config(b) - if err != nil { - return fmt.Errorf("fails to load the beat config: %v\n", err) - } - b.setState(ConfigState) - - // Run beat. This calls first beater.Setup, - // then beater.Run and beater.Cleanup in the end - return b.Run() } -// CommandLineSetup reads and parses the default command line params -// To set additional cmd line args use the beat.CmdLine type before calling the function -// The second return param is to detect if system should exit. True if should exit -// Exit can also be without error -func (beat *Beat) CommandLineSetup() (error, bool) { - - // The -c flag is treated separately because it needs the Beat name - err := cfgfile.ChangeDefaultCfgfileFlag(beat.Name) +// handleFlags parses the command line flags. It handles the '-version' flag +// and invokes the HandleFlags callback if implemented by the Beat. +func (bc *instance) handleFlags() error { + // Due to a dependence upon the beat name, the default config file path + // must be updated prior to CLI flag handling. + err := cfgfile.ChangeDefaultCfgfileFlag(bc.data.Name) if err != nil { - return fmt.Errorf("failed to fix the -c flag: %v\n", err), true + return fmt.Errorf("failed to set default config file path: %v", err) } flag.Parse() if *printVersion { - fmt.Printf("%s version %s (%s)\n", beat.Name, beat.Version, runtime.GOARCH) - return nil, true + fmt.Printf("%s version %s (%s), libbeat %s\n", bc.data.Name, + bc.data.Version, runtime.GOARCH, defaultBeatVersion) + return GracefulExit } - // if beater implements CLIFlags for additional CLI handling, call it now - if flagsHandler, ok := beat.BT.(FlagsHandler); ok { - flagsHandler.HandleFlags(beat) + // Invoke HandleFlags if FlagsHandler is implemented. + if flagsHandler, ok := bc.beater.(FlagsHandler); ok { + err = flagsHandler.HandleFlags(bc.data) } - return nil, false + return err } -// LoadConfig inits the config file and reads the default config information -// into Beat.Config. It exists the processes in case of errors. -func (b *Beat) LoadConfig() error { - - err := cfgfile.Read(&b.Config, "") +// config reads the configuration file from disk, parses the common options +// defined in BeatConfig, initializes logging, and set GOMAXPROCS if defined +// in the config. Lastly it invokes the Config method implemented by the beat. +func (bc *instance) config() error { + var err error + bc.data.RawConfig, err = cfgfile.Load("") if err != nil { - return fmt.Errorf("loading config file error: %v\n", err) + return fmt.Errorf("error loading config file: %v", err) } - err = logp.Init(b.Name, &b.Config.Logging) + err = bc.data.RawConfig.Unpack(&bc.data.Config) if err != nil { - return fmt.Errorf("error initializing logging: %v\n", err) + return fmt.Errorf("error unpacking config data: %v", err) } + err = logp.Init(bc.data.Name, &bc.data.Config.Logging) + if err != nil { + return fmt.Errorf("error initializing logging: %v", err) + } // Disable stderr logging if requested by cmdline flag logp.SetStderr() - logp.Debug("beat", "Initializing output plugins") + bc.data.filters, err = filter.New(bc.data.Config.Filter) + if err != nil { + return fmt.Errorf("error initializing filters: %v", err) + } + debugf("Filters: %+v", bc.data.filters) - if b.Config.Shipper.MaxProcs != nil { - maxProcs := *b.Config.Shipper.MaxProcs + if bc.data.Config.Shipper.MaxProcs != nil { + maxProcs := *bc.data.Config.Shipper.MaxProcs if maxProcs > 0 { runtime.GOMAXPROCS(maxProcs) } } - pub, err := publisher.New(b.Name, b.Config.Output, b.Config.Shipper) + return bc.beater.Config(bc.data) + + // TODO: If -configtest is set it should exit at this point. But changing + // this now would mean a change in behavior. Some Beats may depend on the + // Setup() method being invoked in order to do configuration validation. + // If we do not change this, it means -configtest requires the outputs to + // be available because the publisher is being started (this is not + // desirable - elastic/beats#1213). It (may?) also cause the index template + // to be loaded. +} + +// setup initializes the Publisher and then invokes the Setup method of the +// Beat. +func (bc *instance) setup() error { + logp.Info("Setup Beat: %s; Version: %s", bc.data.Name, bc.data.Version) + + debugf("Initializing output plugins") + var err error + bc.data.Publisher, err = publisher.New(bc.data.Name, bc.data.Config.Output, + bc.data.Config.Shipper) if err != nil { - return fmt.Errorf("error initializing publisher: %v\n", err) + return fmt.Errorf("error initializing publisher: %v", err) } - filters, err := filter.New(b.Config.Filter) + bc.data.Publisher.RegisterFilter(bc.data.filters) + bc.data.Events = bc.data.Publisher.Client() + + err = bc.beater.Setup(bc.data) if err != nil { - return fmt.Errorf("error initializing filters: %v\n", err) + return err } - b.Publisher = pub - pub.RegisterFilter(filters) - b.Events = pub.Client() - - logp.Info("Init Beat: %s; Version: %s", b.Name, b.Version) - logp.Info("Filter %v", filters) + // If -configtest was specified, exit now prior to run. + if cfgfile.IsTestConfig() { + fmt.Println("Config OK") + return GracefulExit + } return nil } -// Run calls the beater Setup and Run methods. In case of errors +// run calls the beater Setup and Run methods. In case of errors // during the setup phase, it exits the process. -func (b *Beat) Run() error { +func (bc *instance) run() error { + logp.Info("%s start running.", bc.data.Name) + return bc.beater.Run(bc.data) +} - // Setup beater object - err := b.BT.Setup(b) - if err != nil { - return fmt.Errorf("setup returned an error: %v", err) - } - b.setState(SetupState) +// cleanup is invoked prior to exit for the purposes of performing any final +// clean-up. This method is guaranteed to be invoked on shutdown if the beat +// reaches the setup stage. +func (bc *instance) cleanup() error { + return bc.beater.Cleanup(bc.data) +} - // Up to here was the initialization, now about running - if cfgfile.IsTestConfig() { - logp.Info("Testing configuration file") - // all good, exit - return nil +// launch manages the lifecycle of the beat and guarantees the order in which +// the Beater methods are invokes and ensures a a proper exit code is set when +// an error occurs. The exit flag controls if this method calls os.Exit when +// it completes. +func (bc *instance) launch(exit bool) error { + err := bc.handleFlags() + if err != nil { + goto cleanup } - service.BeforeRun() - - // Callback is called if the processes is asked to stop. - // This needs to be called before the main loop is started so that - // it can register the signals that stop or query (on Windows) the loop. - service.HandleSignals(b.Exit) - logp.Info("%s sucessfully setup. Start running.", b.Name) - - b.setState(RunState) - // Run beater specific stuff - err = b.BT.Run(b) + err = bc.config() if err != nil { - logp.Critical("Running the beat returned an error: %v", err) + goto cleanup } - return err -} - -// Stop calls the beater Stop action. -// It can happen that this function is called more then once. -func (b *Beat) Stop() { - logp.Info("Stopping Beat") - - if b.getState() == RunState { - b.BT.Stop() + defer bc.cleanup() + err = bc.setup() + if err != nil { + goto cleanup } - service.Cleanup() - - logp.Info("Cleaning up %s before shutting down.", b.Name) + svc.HandleSignals(bc.beater.Stop) + err = bc.run() - if b.getState() > StopState { - // Call beater cleanup function - err := b.BT.Cleanup(b) - if err != nil { - logp.Err("Cleanup returned an error: %v", err) +cleanup: + if exit { + code := 0 + if ee, ok := err.(ExitError); ok { + code = ee.ExitCode + } else if err != nil { + code = 1 } - } - b.setState(StopState) -} - -// Exit begins exiting the beat and initiating shutdown -func (b *Beat) Exit() { - - b.callback.Do(func() { - logp.Info("Start exiting beat") - close(b.exit) - }) -} + if err != nil && code != 0 { + // logp may not be initialized so log the err to stderr too. + logp.Critical("Exiting: %v", err) + fmt.Fprintf(os.Stderr, "Exiting: %v\n", err) + } -// setState updates the state -func (b *Beat) setState(state int8) { - b.stateMutex.Lock() - defer b.stateMutex.Unlock() - b.state = state -} + os.Exit(code) + } -// getState fetches the state -func (b *Beat) getState() int8 { - b.stateMutex.Lock() - defer b.stateMutex.Unlock() - return b.state + return err } diff --git a/libbeat/beat/beat_test.go b/libbeat/beat/beat_test.go index 9056809ae747..3024772df2d1 100644 --- a/libbeat/beat/beat_test.go +++ b/libbeat/beat/beat_test.go @@ -9,26 +9,24 @@ import ( "github.com/stretchr/testify/assert" ) -func Test_NewBeat(t *testing.T) { - +func TestNewInstance(t *testing.T) { tb := &TestBeater{} - b := NewBeat("testbeat", "0.9", tb) + b := newInstance("testbeat", "0.9", tb) - assert.Equal(t, "testbeat", b.Name) - assert.Equal(t, "0.9", b.Version) + assert.Equal(t, "testbeat", b.data.Name) + assert.Equal(t, "0.9", b.data.Version) // UUID4 should be 36 chars long - assert.Equal(t, 16, len(b.UUID)) - assert.Equal(t, 36, len(b.UUID.String())) + assert.Equal(t, 16, len(b.data.UUID)) + assert.Equal(t, 36, len(b.data.UUID.String())) } -func Test_NewBeat_UUID(t *testing.T) { - +func TestNewInstanceUUID(t *testing.T) { tb := &TestBeater{} - b := NewBeat("testbeat", "0.9", tb) + b := newInstance("testbeat", "0.9", tb) // Make sure the UUID's are different - assert.NotEqual(t, b.UUID, uuid.NewV4()) + assert.NotEqual(t, b.data.UUID, uuid.NewV4()) } // Test beat object diff --git a/libbeat/beat/errors.go b/libbeat/beat/errors.go new file mode 100644 index 000000000000..1c72013d9648 --- /dev/null +++ b/libbeat/beat/errors.go @@ -0,0 +1,30 @@ +package beat + +import "fmt" + +var ( + // GracefulExit is an error that signals to exit with a code of 0. + GracefulExit = ExitError{} +) + +// ExitError is an error type that can be returned to set a specific exit code. +type ExitError struct { + ExitCode int + Cause error +} + +func (e ExitError) Error() string { + if e.Cause != nil { + return e.Cause.Error() + } + + return "" +} + +// NewExitError returns a new ExitError. +func NewExitError(code int, format string, args ...interface{}) error { + return ExitError{ + ExitCode: code, + Cause: fmt.Errorf(format, args), + } +} diff --git a/libbeat/cfgfile/cfgfile.go b/libbeat/cfgfile/cfgfile.go index 0d7fc184f2d4..16a5df293bfe 100644 --- a/libbeat/cfgfile/cfgfile.go +++ b/libbeat/cfgfile/cfgfile.go @@ -12,19 +12,17 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -// Command line flags -var configfile *string -var testConfig *bool - -func init() { - // The default config cannot include the beat name as it is not initialised when this - // function is called, but see ChangeDefaultCfgfileFlag +// Command line flags. +var ( + // The default config cannot include the beat name as it is not initialized + // when this variable is created. See ChangeDefaultCfgfileFlag which should + // be called prior to flags.Parse(). configfile = flag.String("c", "beat.yml", "Configuration file") testConfig = flag.Bool("configtest", false, "Test configuration and exit.") -} +) -// ChangeDefaultCfgfileFlag replaces the value and default value for the `-c` flag so that -// it reflects the beat name. +// ChangeDefaultCfgfileFlag replaces the value and default value for the `-c` +// flag so that it reflects the beat name. func ChangeDefaultCfgfileFlag(beatName string) error { cliflag := flag.Lookup("c") if cliflag == nil { @@ -41,30 +39,40 @@ func ChangeDefaultCfgfileFlag(beatName string) error { return cliflag.Value.Set(cliflag.DefValue) } -// Read reads the configuration from a yaml file into the given interface structure. -// In case path is not set this method reads from the default configuration file for the beat. +// Deprecated: Please use Load(). +// +// Read reads the configuration from a YAML file into the given interface +// structure. If path is empty this method reads from the configuration +// file specified by the '-c' command line flag. func Read(out interface{}, path string) error { + config, err := Load(path) + if err != nil { + return nil + } + return config.Unpack(out) +} + +// Load reads the configuration from a YAML file structure. If path is empty +// this method reads from the configuration file specified by the '-c' command +// line flag. +func Load(path string) (*common.Config, error) { if path == "" { path = *configfile } - filecontent, err := ioutil.ReadFile(path) + fileContent, err := ioutil.ReadFile(path) if err != nil { - return fmt.Errorf("Failed to read %s: %v. Exiting.", path, err) + return nil, fmt.Errorf("failed to read %s: %v", path, err) } - filecontent = expandEnv(filecontent) + fileContent = expandEnv(fileContent) - config, err := common.NewConfigWithYAML(filecontent, path) + config, err := common.NewConfigWithYAML(fileContent, path) if err != nil { - return fmt.Errorf("YAML config parsing failed on %s: %v. Exiting.", path, err) + return nil, fmt.Errorf("YAML config parsing failed on %s: %v", path, err) } - err = config.Unpack(out) - if err != nil { - return fmt.Errorf("Failed to apply config %s: %v. Exiting. ", path, err) - } - return nil + return config, nil } // IsTestConfig returns whether or not this is configuration used for testing diff --git a/libbeat/tests/system/test_base.py b/libbeat/tests/system/test_base.py index 2d37e2c6923a..4a90e7856dd3 100644 --- a/libbeat/tests/system/test_base.py +++ b/libbeat/tests/system/test_base.py @@ -15,9 +15,8 @@ def test_base(self): ) proc = self.start_beat() - self.wait_until( lambda: self.log_contains("Init Beat")) - exit_code = proc.kill_and_wait() - assert exit_code == 0 + self.wait_until( lambda: self.log_contains("Setup Beat")) + proc.check_kill_and_wait() def test_no_config(self): """ @@ -26,8 +25,8 @@ def test_no_config(self): exit_code = self.run_beat() assert exit_code == 1 - assert self.log_contains("loading config file error") is True - assert self.log_contains("Failed to read") is True + assert self.log_contains("error loading config file") is True + assert self.log_contains("failed to read") is True def test_invalid_config(self): """ @@ -39,7 +38,7 @@ def test_invalid_config(self): exit_code = self.run_beat(config="invalid.yml") assert exit_code == 1 - assert self.log_contains("loading config file error") is True + assert self.log_contains("error loading config file") is True assert self.log_contains("YAML config parsing failed") is True def test_config_test(self): @@ -53,7 +52,7 @@ def test_config_test(self): config="libbeat.yml", extra_args=["-configtest"]) assert exit_code == 0 - assert self.log_contains("Testing configuration file") is True + assert self.log_contains("Config OK") is True def test_version(self): """ @@ -70,7 +69,7 @@ def test_version(self): os.path.join(self.working_dir, "coverage.cov") ]) - assert self.log_contains("loading config file error") is False + assert self.log_contains("error loading config file") is False with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \ as outputfile: diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index ab71b354dd5e..994ee34962e1 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -28,7 +28,6 @@ package beater import ( "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/include" @@ -47,7 +46,7 @@ func New() *Metricbeat { func (mb *Metricbeat) Config(b *beat.Beat) error { mb.MbConfig = &Config{} - err := cfgfile.Read(mb.MbConfig, "") + err := b.RawConfig.Unpack(mb.MbConfig) if err != nil { logp.Err("Error reading configuration file: %v", err) return err diff --git a/metricbeat/tests/system/test_base.py b/metricbeat/tests/system/test_base.py index 8d0054ccb6a4..052d634077aa 100644 --- a/metricbeat/tests/system/test_base.py +++ b/metricbeat/tests/system/test_base.py @@ -10,6 +10,6 @@ def test_base(self): ) proc = self.start_beat() - self.wait_until( lambda: self.log_contains("Init Beat")) + self.wait_until( lambda: self.log_contains("Setup Beat")) exit_code = proc.kill_and_wait() assert exit_code == 0 diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 94bc381911b8..be38fc858453 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -3,12 +3,11 @@ package beater import ( "flag" "fmt" - "os" "runtime" + "sync" "time" "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common/droppriv" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/service" @@ -32,7 +31,6 @@ type Packetbeat struct { CmdLineArgs CmdLineArgs Pub *publish.PacketbeatPublisher Sniff *sniffer.SnifferSetup - over chan bool services []interface { Start() @@ -78,13 +76,12 @@ func New() *Packetbeat { } // Handle custom command line flags -func (pb *Packetbeat) HandleFlags(b *beat.Beat) { +func (pb *Packetbeat) HandleFlags(b *beat.Beat) error { // -devices CLI flag if *pb.CmdLineArgs.PrintDevices { devs, err := sniffer.ListDeviceNames(true) if err != nil { - fmt.Printf("Error getting devices list: %v\n", err) - os.Exit(1) + return fmt.Errorf("Error getting devices list: %v\n", err) } if len(devs) == 0 { fmt.Printf("No devices found.") @@ -97,17 +94,19 @@ func (pb *Packetbeat) HandleFlags(b *beat.Beat) { for i, dev := range devs { fmt.Printf("%d: %s\n", i, dev) } - os.Exit(0) + return beat.GracefulExit } + return nil } // Loads the beat specific config and overwrites params based on cmd line func (pb *Packetbeat) Config(b *beat.Beat) error { // Read beat implementation config as needed for setup - err := cfgfile.Read(&pb.PbConfig, "") + err := b.RawConfig.Unpack(&pb.PbConfig) if err != nil { logp.Err("fails to read the beat config: %v, %v", err, pb.PbConfig) + return err } // CLI flags over-riding config @@ -130,7 +129,7 @@ func (pb *Packetbeat) Config(b *beat.Beat) error { // TODO: Refactor config.ConfigSingleton = pb.PbConfig - return err + return nil } // Setup packetbeat @@ -138,7 +137,7 @@ func (pb *Packetbeat) Setup(b *beat.Beat) error { if err := procs.ProcWatcher.Init(pb.PbConfig.Procs); err != nil { logp.Critical(err.Error()) - os.Exit(1) + return err } queueSize := defaultQueueSize @@ -155,22 +154,17 @@ func (pb *Packetbeat) Setup(b *beat.Beat) error { logp.Debug("main", "Initializing protocol plugins") err := protos.Protos.Init(false, pb.Pub, pb.PbConfig.Protocols) if err != nil { - logp.Critical("Initializing protocol analyzers failed: %v", err) - os.Exit(1) + return fmt.Errorf("Initializing protocol analyzers failed: %v", err) } - pb.over = make(chan bool) - logp.Debug("main", "Initializing sniffer") if err := pb.setupSniffer(); err != nil { - logp.Critical("Initializing sniffer failed: %v", err) - os.Exit(1) + return fmt.Errorf("Initializing sniffer failed: %v", err) } // This needs to be after the sniffer Init but before the sniffer Run. if err := droppriv.DropPrivileges(config.ConfigSingleton.RunOptions); err != nil { - logp.Critical(err.Error()) - os.Exit(1) + return err } return nil @@ -243,27 +237,25 @@ func (pb *Packetbeat) Run(b *beat.Beat) error { service.Start() } - // run the sniffer in background + var wg sync.WaitGroup + errC := make(chan error, 1) + + // Run the sniffer in background + wg.Add(1) go func() { + defer wg.Done() err := pb.Sniff.Run() if err != nil { - logp.Critical("Sniffer main loop failed: %v", err) - os.Exit(1) + errC <- fmt.Errorf("Sniffer main loop failed: %v", err) } - pb.over <- true }() - // Startup successful, disable stderr logging if requested by - // cmdline flag - logp.SetStderr() - logp.Debug("main", "Waiting for the sniffer to finish") - - // Wait for the goroutines to finish - for range pb.over { - if !pb.Sniff.IsAlive() { - break - } + wg.Wait() + select { + default: + case err := <-errC: + return err } // kill services diff --git a/topbeat/beater/topbeat.go b/topbeat/beater/topbeat.go index 1dc9e95b9f06..2dd8a3e461c3 100644 --- a/topbeat/beater/topbeat.go +++ b/topbeat/beater/topbeat.go @@ -6,7 +6,6 @@ import ( "time" "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/topbeat/system" @@ -36,7 +35,7 @@ func New() *Topbeat { func (tb *Topbeat) Config(b *beat.Beat) error { - err := cfgfile.Read(&tb.TbConfig, "") + err := b.RawConfig.Unpack(&tb.TbConfig) if err != nil { logp.Err("Error reading configuration file: %v", err) return err diff --git a/winlogbeat/beater/winlogbeat.go b/winlogbeat/beater/winlogbeat.go index b6fe8a77eefd..56ebfc0b697c 100644 --- a/winlogbeat/beater/winlogbeat.go +++ b/winlogbeat/beater/winlogbeat.go @@ -10,7 +10,6 @@ import ( "time" "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" @@ -63,7 +62,7 @@ func New() *Winlogbeat { // Config sets up the necessary configuration to use the winlogbeat func (eb *Winlogbeat) Config(b *beat.Beat) error { // Read configuration. - err := cfgfile.Read(&eb.config, "") + err := b.RawConfig.Unpack(&eb.config) if err != nil { return fmt.Errorf("Error reading configuration file. %v", err) }