Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Filebeat input v2 #14229

Closed
wants to merge 10 commits into from
Closed

Conversation

urso
Copy link

@urso urso commented Oct 24, 2019

Depends on: #14144, #14079, #12908

Note: This is a draft PR for discussing overal changes/apporach. Once it stabilizes I will split it up the PR into smaller ones to ease actual reviewing.

This PR experiments with an alternative input API for Beats. The goal is to provide a common library that can run Filebeat inputs, Metricbeat modules, and Heartbeat monitors.
Work is still in progress, and more PRs will follow. The new registry store has been already implemented in the feature-new-registry-file branch, and the interfaces defined here will make heavy use of it.

Currently inputs/modules in are registered globally by constructor. The constructor is responsible for parsing the untyped configuration, and directly staring the data collection. But there is no good means to only Test a configuration, do any other kind of validation upfront. Due to this we work with untyped configurations in too many places, plus the Beats commands are all entangled with Beats startup in order to parse a subset of the configuration (code in libbeat/cmd is very difficult to maintain).

The plugin API presented here encourages the use of concrete types. We also remove the need for the global registry.

Plugin authors define an input plugin by filling out the Plugin data structure:

type Plugin struct {
	Name        string
	Constraints Constraint
	Doc         string

	// TODO: config schema info for config validation

	// Configure configures an Input if possible. Returns an error if the
	// configuration is false.
	// The input return must not attempt to run or create any kind of connection yet.
	// The input is only supposed to be a types place holder for the untyped
	// configuration.
	Configure func(log *logp.Logger, config *common.Config) (Input, error)
}

A beat links/provides input plugins by creating a 'Loader' instance with all the plugins that the beat want to provide. for example:

loader, err := input.NewLoader(licenser,
  logs.Plugin, // <- global Plugin variable in the package. 
  kafka.Plugin,
  ...
)

A loader only implements the interface { Configure(log *logp.Logger, config *common.Config) (Input, error) }.

NOTE: So to not repeat input definitions, I'm considering profile packages with a set of default Plugins exported.
NOTE: The loader is still subject to change, so we can mix multiple profiles for example.

The Input returned by Configure is supposed to be an interface that replaces the untyped common.Config. Once establed, it should not be modifyable. Using this pattern (hopefully with other features in the future), Beats will be able to parse the complete configuration into an typed AST, such that *common.Config is not required anymore after the parsing step. This should help to decouple config validation, parsing, and command execution. Each sub-command will be able to operate on the config AST, instead of an partially-initialized beats instance.

NOTE: full config validation (also checking for indentation errors and typos) is a tricky task, due to the beats flexibility and therefore not adressed here. 2 possible solutions comming to mind:

  • Add a 'config schema' to the Plugin, that can be used to configure the overall configuration. Configuration schemas will be required for Fleet/integrations as well, and could be some shared resource between different products in the stack
  • Record a schema while unpacking a configuration in go-ucfg. Based on the recording we can check for typos, wrong indentation, unknown settings.

An Input needs to implement TestInput, which can be used by via CLI, or the agent to test a configuration, without actually running it. An input could for example try to connect to the host we should collect data from.

The Input also acts as a Runner factory. The Runner factory somewhat 'complicates' the inputs interface and setup phases. But by being a runner factory, we can more easily reuse the input api as is with autodiscovery and other beats subsystems. To help developers implementing the Input, we provide a ConfiguredInput type:

type ConfiguredInput struct {
	Info  string
	Input RunnableInput
}

type RunnableInput interface {
	Test(*chorus.Closer, *logp.Logger) error
	Run(Context) error
}

This code sets up the kafka input for example (all filebeat inputs can be setup using the same pattern):

var Plugin = input.Plugin{
	Name:      "kafka",
	Doc:       "Collect kafka topics using consumer groups",
	Configure: initInput,
}

type kafkaInput struct {
	config       inputConfig
	saramaConfig *sarama.Config
}

func initInput(log *logp.Logger, cfg *common.Config) (input.Input, error) {
	config, err := parseConfig(cfg)
	if err != nil {
		return nil, err
	}

	saramaConfig, err := newSaramaConfig(config)
	if err != nil {
		return nil, errors.Wrap(err, "initializing sarama config")
	}

	return input.ConfiguredInput{
		Info: "kafka", // <- required to have a description for the 'cfgfile.Runner' interface
		Input: &kafkaInput{
			config:       config,
			saramaConfig: saramaConfig,
		}
	}, nil
}

func (inst *kafkaInput) TestInput(closer *chorus.Closer, log *logp.Logger) error {
  ...
}

func (inst *kafkaInput) Run(context input.Context) error {
  ...
}

The kafka input has no Start/Stop/Wait method for handling shutdown anymore. We also get rid of the required factory/constructor, that was used to hold state required for shutdown handling. All required state (e.g. network connections) is to be managed in the Run method only.

If the input is started, Run will be called. If Run returns, the input is stopped. If autodiscovery/config-reloading/agent calls 'Stop', then a shutdown signal is propagated via context.Closer, which is also compatible with context.Context.

The context provided to the Input is defined like this:

// Context provides access to common resources and shutdown signaling to
// inputs. The `Closer` provided is compatible to `context.Context` and can be used
// with common IO libraries to unblock connections on shutdown.
type Context struct {
	// StoreAccessor allows inputs to access a resource store. The store can be used
	// for serializing state, but also for coordination such that only one input
	// collects data from a resource.
	StoreAccessor

	// Closer provides support for shutdown signaling.
	// It is compatible to context.Context, and can be used to cancel IO
	// operations during shutdown.
	Closer *chorus.Closer

	// Log provides the structured logger for the input to use
	Log *logp.Logger

	// Scheduler provides cron-job like services to an input.
	// It can be used to configure the exact moment a task should be executed.
	// Tasks should only be configured with the scheduler for as long as the input is running.
	// On 'stop' an input must unregister configured tasks
	//
	// XXX: currently in heartbeat, but we will move it to libbeat
	Scheduler *scheduler.Scheduler

	// Observer is used to signal state changes. The state is used for reporting
	// the state/healthiness to users using management/monitoring APIs.
	Status StatusObserver

	// Pipeline allows inputs to connect to the active publisher pipeline. Each
	// go-routine creating and publishing events should have it's own connection.
	Pipeline beat.PipelineConnector
}

Note: we do not pass monitoring support. The StatusObserver is required to report the input its status. For monitoring support we should find concrete types/interfaces to report common metrics in an unified way.

Inputs/modules/monitors support (still TODO in later PRs):

  • New Filebeat inputs will be implemented as shown
  • Metricbeat modules have a many different interfaces for the metricsets. They can have a module or not. Definitions for wrapping the different metribeat modules will be provided in the future. A module will be treated like an Input. The module input type will take care of configuring and running the metricsets. => definition of module and metricsets be more similar to our configuration structure
  • For heartbeat monitors we can provide wrappers as well. The required scheduler for heartbeat tasks is made available in the input.Context.
  • For old Filebeat inputs we will NOT provide 'wrappers'. Instead we keep the old filebeat input architecture until the current inputs have been transformed into the new API. For the time being Filebeat will run old and new architecture in parallel.

Input Status is a new concern we have in Beats. For use with the http status API, or agent, or Beats monitoring UI, we want to be able to collect and display an per input status. The current interface to report the status is defined as:

// StatusObserver is used to report a standardized set of state change events
// of an input.
type StatusObserver interface {
	// Starting indicates that the input is about to be configured and started.
	Starting()

	// Initialized reports that required resources are initialized, but the
	// Input is not collecting events yet.
	Initialized()

	// Active reports that the input is about to start collecting events.
	Active()

	// Failing reports that the input is experiencing temporary errors. The input
	// does not quit yet, but will attempt to retry.
	Failing(err error)

	// Stopping reports that the input is about to stop and clean up resources.
	Stopping()

	// Stopped reports that the input has finished the shutdown and cleanup.
	Stopped()

	// Failed indicates that the input has been stopped due to a fatal error.
	Failed(err error)
}

The kafka input is an good example with temporary network errors, that might require the Beat to reconnect to the cluster. The kafka inputs Run method becomes:

func (inst *kafkaInput) Run(context input.Context) error {
	log := context.Log // <- TODO: add context like consumer group name and endpoints.

	// NOTE: the runner keeps track of pipeline connections and auto-closes them
	//       if Run returns. Closing a connection within Run is optional.
	//       The input runner automatically adds context.Closer, if no custom CloseRef
	//       is configured. This ensures that event publishing unblocks on shutdown.
	out := context.Pipeline.ConnectWith(beat.ClientConfig{
		ACKEvents: func(events []interface{}) { // setup an handler for end-to-end ACK
			for _, event := range events {
				if meta, ok := event.(eventMeta); ok {
					meta.handler.ack(meta.message)
				}
			}
		},
		WaitClose: inst.config.WaitClose,
	})

	// If the consumer fails to connect, we use exponential backoff with
	// jitter up to 8 * the initial backoff interval.
	backoff := backoff.NewEqualJitterBackoff(
		inst.context.Done,
		inst.config.ConnectBackoff,
		8*inst.config.ConnectBackoff)

	inst.context.Status.Initialized() // report that we are ready to go

	for context.Closer.Err() == nil { // restart input after error (given there is no shutdown signal)
		group, err := sarama.NewConsumerGroup(inst.config.Hosts, inst.config.GroupID, inst.saramaConfig)
		if err != nil {
			if err == chorus.ErrClosed {
				break
			}

			inst.context.Status.Failing(err)
			log.Errorw(
				"Error initializing kafka consumer group", "error", err)
			backoff.Wait()
			continue
		}

		// We've successfully connected, reset the backoff timer.
		backoff.Reset()
		inst.context.Status.Active()

		// We have a connected consumer group now, try to start the main event
		// loop by calling Consume (which starts an asynchronous consumer).
		// In an ideal run, this function never returns until shutdown; if it
		// does, it means the errors have been logged and the consumer group
		// has been closed, so we try creating a new one in the next iteration.
		
		err := inst.runConsumerGroup(context, out, consumerGroup)
		if err != nil && err != chorus.ErrClosed {
			inst.context.Status.Failing(err) // report temporary error
		}
	}
	inst.context.Status.Stopping() // we are about to shutdown

	return nil
}

One main issue we want to address with this API is the state handling in the regsitry. The store and resource part is implemented in #14079. The input.Context provides a StoreAccessor. Using the accessor we do allow inputs to configure a the registry they want to access. Resources locks and resource state is local to a shared store. All inputs accessing a store X, have a reference to the same shared store. The store acts as a key value store.

Resources in a store are accessed by key. A Resource must be locked, in order to modify it. This guarantees that no 2 inputs can read and modify global state for the same resource.

Registry file update operations can be immediated or deferred. An update operation can update selected fields only. Immediate operations are directly written to the store. Deferred updates are supposed to be written to the registry after the event has been ACKed by the output.
A resource state should be split into resource metadata, and resource read state. Use immediate updates for metadata only (that are required to correctly track the resource in the future), and use deferred updates for read state changes (e.g. file offset).

So to decouple state updates from output ACK delays, deferred updates are actually written to some intermediate cache layer in the Store. If an input unlocks and returns, the state in the store can be ahead of the state in the registry file. The registry file is eventually consistent. The cached state is removed from the caching layer only after all deferred updates have been written to the registry file (registry file is consistent with all state updates so far).

By allowing the registry to be eventually consistent, we do not have to block on shutdown and can allow an input to be reconfigured immediately (or allow another input to pick up a resource). The reconfigured input can immediately start from the cached state.
As update operations need to pass the publisher pipeline, it is the publisher pipeline (with ordered ACKs), that guarantees that the read state updates are ACKed (and written to the registry) in correct order. This also helps reloading via the Agent or autodiscovery, as inputs can be stopped more immediate.

TODO: per input autodiscovery hints support.
Right now autodiscovery hints support is some per beat global subsystems, constructing configuration objects from docker/k8s event meta-data. The implementation is not really aware of the configured inputs capabilities, but happily configures some settings that might apply. This creates a coupling between autodiscovery hints and inputs (you need to know the right setting names in inputs), although both are managed independently in code.
Plus unsupported settings for might be configured for the wrong input, without the user noticing.
Using the 'Plugin' definition I'm thinking to move hints support to the input itself, such that input configuration and hints are managed by the input author. We might also consider some kind of validation to point out invalid hints for the input to be configured.

filebeat/input/v2/statestore/statestore.go Show resolved Hide resolved
filebeat/input/v2/statestore/errors.go Outdated Show resolved Hide resolved
filebeat/input/v2/statestore/errors.go Outdated Show resolved Hide resolved
libbeat/common/transform/typeconv/typeconv.go Show resolved Hide resolved
filebeat/input/v2/statestore/statestore.go Show resolved Hide resolved
filebeat/input/v2/statestore/errors.go Outdated Show resolved Hide resolved
filebeat/input/v2/statestore/errors.go Outdated Show resolved Hide resolved
@urso urso changed the title Filebeat input v2 experiments [WIP] Filebeat input v2 Oct 24, 2019
@urso urso added discussion in progress Pull request is currently in progress. labels Oct 25, 2019
filebeat/input/v2/constraint.go Outdated Show resolved Hide resolved
filebeat/input/v2/constraint.go Outdated Show resolved Hide resolved
filebeat/input/v2/tester.go Show resolved Hide resolved
log *logp.Logger
}

func NewTestRunner(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported function NewTestRunner should have comment or be unexported

"github.com/elastic/go-concert/chorus"
)

// TestRunner can be used to manage the test run of one or multiple inputs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on exported type InputTestRunner should be of the form "InputTestRunner ..." (with optional leading article)

filebeat/input/v2/store.go Outdated Show resolved Hide resolved
filebeat/input/v2/store.go Outdated Show resolved Hide resolved
"github.com/elastic/beats/libbeat/logp"
)

type Loader struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Loader should have comment or be unexported

filebeat/input/v2/license.go Outdated Show resolved Hide resolved
filebeat/input/v2/license.go Outdated Show resolved Hide resolved
return i.Input.Test(closer, log)
}

func (i *ConfiguredInput) CreateRunner(ctx Context) (Runner, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method ConfiguredInput.CreateRunner should have comment or be unexported

waiter sync.WaitGroup
}

func (i *ConfiguredInput) TestInput(closer *chorus.Closer, log *logp.Logger) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method ConfiguredInput.TestInput should have comment or be unexported

Input RunnableInput
}

type RunnableInput interface {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type RunnableInput should have comment or be unexported

"github.com/elastic/go-concert/chorus"
)

type ConfiguredInput struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type ConfiguredInput should have comment or be unexported

import "errors"

var (
ErrNoTypeConfigured = errors.New("No type configured")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported var ErrNoTypeConfigured should have comment or be unexported

Copy link
Contributor

@ph ph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso I like the proposal we have been talking about this for a long time (years?)

Just some notes in related in the context of the agent and beats.

Adding custom Observer is really powerful, also we can easily chain them together to make sure all events are sends. Here a few uses cases I can see:

  • Expose the Status of the plugins through the API. (active, failed, error..)
  • Expose monitoring states, (number of running inputs, failures..)
  • Return errors to fleet in a uniform way we can, augment the errors if need to attach more information.

Looking at the Plugin definition, I think the output and processors can match most of the defined interface. We address in this proposal how plugins are registered but I think before making the change we also need to define how plugins are managed (started, stopped, removed)

Looking at the concrete configuration do you think the following is the end goal of this proposal, received an untyped configuration, resolve the configuration to a concrete configuration and apply the changes to the currently running system.

I am asking this because I believe the following:

  • A configuration from the filesystem
  • A dynamic configuration from autodiscover
  • A configuration from the Agent.

For me the all above is always about creating a config and the system should be able to decide what needs to be done to run that configuration (either partially or complete). This changes the behavior of autodiscover, instead of creating the configuration on the fly or inputs and start directly the inputs. It will add or remove part of the configuration and the global current configuration is used as a way to decide what needs to be run in the system.

Note that the above is necessary to control the pressure and events created in the system, but this allows that all the different sources of configuration to have the same path and behave in a uniform manner.

// The input is only supposed to be a types place holder for the untyped
// configuration.
Configure func(log *logp.Logger, config *common.Config) (Input, error)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a maturity in the Plugin definition? Stable, Beta, Experimental, we could also automatically add theses context or data to the logger send to the plugin instance.

filebeat/input/v2/plugin.go Outdated Show resolved Hide resolved
// On 'stop' an input must unregister configured tasks
//
// XXX: currently in heartbeat, but we will move it to libbeat
Scheduler *scheduler.Scheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume we would have something like.

Scheduler.Register(p, Scheduler.Daily, func(...) )
Scheduler.Remove(p)

This would give us the opportunity of having save execution state of the scheduler for restart.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it is the heartbeat scheduler. The scheduler itself does not make any assumption about the 'Schedule' besides expecting an interface implementing Next() time.Time.

One can simply add/remove tasks from the scheduler. A Task can return an array of continuation tasks. A task is done only after all continuations are done. Most simple cases (e.g. Metricbeat) don't need the continuation functionality, but it's fundamental to heartbeat, where it is used to check all known IPs after an DNS query.

This would give us the opportunity of having save execution state of the scheduler for restart.

Given that the schedule itself is just an interface, I think we could add some information like this in the future to the registry (as we do for input reading state). But this will require us to have a constant ID for inputs/tasks that does not change between restarts. I didn't really account for this in this POC.

As a task can produce multiple events, this would require some kind of ref-counting though. Only once all events that are produced by a Task are ACKed can we update the registry. I have an idea how this could work somewhat seamlessly, hiding most of the complexity from developers, but this would require quite some plumbing. Maybe in a follow up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you have in mind by this scheduler? I understand this could be used for Heartbeat monitors or Metricbeat modules. It would probably make sense to have another example using this API

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha... everyone is asking about the scheduler :)

This is one of many PRs... maybe I should remove it for now to reduce confusion.

Original goal is to use it for Heartbeat monitors. Heartbeat monitors already implement the Runner interface (Start and Stop method). I "just" need to pass the scheduler to the constructor, and voilà we can run heartbeat monitor next to metricbeat modules using the same framework.

Using the scheduler for modules is optional. I don't plan to add Metricbeat modules support in this PR. I don't have yet decided if we want to use the scheduler for metricbeat modules (we can discuss it when I add the integration). But I don't plan to force people to rewrite or adapt existing metricsets. Instead I will provide wrappers that will support the different interfaces we have in Metricbeat for reuse. E.g. modules with metricsets only might be turned into Plugins like this in the future:

input.Plugin{
  Name: "elasticsearch",
  Doc: "elasticsearch metrics collection",
  Configure: input.WithDatasets(
    input.Metricset{
      Name: "index",
      New: index.New, // constructor from metricbeat/module/elasticsearch/index package
    },
    input.Metricset{
      Name: "node",
      Default: true,
      New: node.New, // constructor from metricbeat/module/elasticsearch/node package
    },
  ),
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to remove the scheduler it complexify the current discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to removing it now, but this looks promising!

type Input interface {
InputTester
RunnerFactory
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I have raised the issue directly but input should probably have some kind of ids, we can make then automatically generated and definable by users. The later is probably even better to enforce a unique identifier on each input definition, lets say we want to correlate logs events and specific configuration or monitoring.

For monitoring or reporting events to fleet we can get around that by saving that information in the observer, but that could be useful for log statements too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enforcing the id on the logger, ensure that by default everything is scoped to a specific instance without having a programmer adding unnecessary context to the logger, like paths watched.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder where we should enforce the ID. And how to pass it. Maybe it should not be enforced by the plugin API, but the architecture using the plugins would add/manage ids. As the logger and overall context is passed to the plugin, we can have the underlying framework populate this information. In this case the ID could be passed as parameter in the context, in case an input wants to report it.

If the ID is pushed top-down, it is the framework managing IDs. Maybe this could be used together with the scheduler. E.g. if the ID is given (static), then we enable support for storing the schedule/task state in the registry. In this case no the input/plugin developer doesn't need to handle any kind of management info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the architecture using the plugins would add/manage ids. As the logger and overall context is passed to the plugin.

Correct, we would need some kind of bookkeeping of ids especially if we let people define them, to make sure they are indeed unique.

What if, the ID is a system option (in the yaml) when defining the plugin so plugins author doesn't see it or have to define it. We could make it available to the passed context to the plugin so an author can reference that if needed (identify a client to a remote system maybe?)

Maybe this could be used together with the scheduler. E.g. if the ID is given (static)

The goal for having the scheduler that information is to recover execution information from last tick()? I am assuming that if an inputs configuration is changed we might want to try to keep the same ID as much as possible to keep correlation possible and allow us to answer the following question.

Did the config changes actually improve or not the performance of the input.

filebeat/input/v2/plugin.go Outdated Show resolved Hide resolved
Copy link
Member

@ruflin ruflin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @urso for kicking this off. Have only skimmed through the code so far, so a few high level questions:

  • Shouldn't this code be in libbeat/input/* instead of filebeat/...?
  • Metricbeat modules: You touched on it quickly in your PR description. You mention " A module will be treated like an Input." My thinking here so far is that a module contains multiple metricsets and the metricsets are based on an input. So I expect the concept of modules and metricsets to stay around in the short and mid term but that much more code can be shared because the metricsets depend on an input. Some of the modules and metricsets will become config only, like we have today in Filebeat.
  • State on reloading: One challenge we had in the past is what happens on reloading an input config. The old config must shut down first completely before the new one can be started. In your PR description you mention now, this is not needed anymore because of the registry order writing. Lets assume we have a file foo.log. An input tails it and sends all the lines until the end. All lines except the last 2 are ACKED. Now it is stopped and a new input on the same file is started, it continues reading from the cache registry after the last line and sends the data. For some reasons, the last 2 lines of the old input can never be acked. Will this mean the state of the new input will never be written because of the ordering? What happens in this case?
  • Scheduler: You plan to migrate the scheduler concept from Heartbeat to the input. Will the current concept of defining period: 5s still exists or will the user have to use cron definitions?

@ph
Copy link
Contributor

ph commented Oct 29, 2019

@ruflin Concerning the Scheduler I think its just implementation details. a cron or a period of 5s is the same implementation, its only how a user defines it change. So I think most of the time we would keep the period because it's an easier context to understand.

@urso
Copy link
Author

urso commented Oct 29, 2019

Looking at the concrete configuration do you think the following is the end goal of this proposal, received an untyped configuration, resolve the configuration to a concrete configuration and apply the changes to the currently running system.

An input is not reconfigurable, as Input is the typed configuration object which is used to create a Runner. In this sense the Input acts as an AbstractFactory, that can be used by autodiscovery, config reloading, or event static setup.
I made the interface, such that it fits with the current autodiscovery functionality.
The ConfiguredInput type even requires developers to implement Run only. There is no signaling for config changes. I'd prefer to not build a system that allows inputs to be reconfigured. This would put quite some burden on the developer.

If we introduce IDs, then we need some other component that keeps track of an IDs state. The per ID state would be: current Input (typed configuration), active runner, should input (typed configuration). On config change the system would need to signal stop to the active runner and create a new one in the meantime. The eventual consistency on the registry will help us in reconfiguring/restarting a runner somewhat in time.

For me the all above is always about creating a config and the system should be able to decide what needs to be done to run that configuration (either partially or complete). This changes the behavior of autodiscover, instead of creating the configuration on the fly or inputs and start directly the inputs. It will add or remove part of the configuration and the global current configuration is used as a way to decide what needs to be run in the system.

I'm not sure I can follow here. I agree we need some change in autodiscovery on how is-state and should-state is handled. But I don't think that I want to push the responsibility into the input. Within this PR it is out of scope, as I don't plan to adapt auto-discovery yet (although I'm preparing for it).

@urso
Copy link
Author

urso commented Oct 30, 2019

Shouldn't this code be in libbeat/input/* instead of filebeat/...?

I'm still experimenting with the interface. Originally filebeat was the target. If we think it is reusable enough we still can move it to libbeat later (this is still a feature branch and more changes will be coming).

Metricbeat modules: You touched on it quickly in your PR description. You mention " A module will be treated like an Input." My thinking here so far is that a module contains multiple metricsets and the metricsets are based on an input.

Hm... maybe you are mixing some conceptes here. As more low level entity we have sources. See filebeat/inputsource package.

Conceptually an Input is not the data collector itself, but an input can set up and run many collectors/harvestors based on its configuration.

Configuration wise a module is the entity to be configured. Users selectively enable metricsets within a module configuration. In this sense a metricbeat module is rather similar to an filebeat input (like log collector). The main differentiator is: filebeat inputs (like log input) collect many independent sources of the same type, while modules collect dependent sources (same service) of a different type.

The aim is to reflect the entities that make a module at the config level right in code. This is currently not the case. Instead we deal with hidden registries, wrappers and many different possible interfaces a module can implement. We want to reduce the amount of magic. Having a module implementation or not is optional. My idea is to always force the module plugin to be defined explicitely in code and reuse it. We can provide wrapper types that will allow you to combine metricsets into a module definition. As I'm targeting filebeat first, I don't really have sample code for this yet.

State on reloading: One challenge we had in the past is what happens on reloading an input config. The old config must shut down first completely before the new one can be started. In your PR description you mention now, this is not needed anymore because of the registry order writing. Lets assume we have a file foo.log. An input tails it and sends all the lines until the end. All lines except the last 2 are ACKED. Now it is stopped and a new input on the same file is started, it continues reading from the cache registry after the last line and sends the data. For some reasons, the last 2 lines of the old input can never be acked. Will this mean the state of the new input will never be written because of the ordering? What happens in this case?

The state in the registry follows the in-memory state. All updates to the registry are still required to be order. But the registry is only updated after an event has been ACKed.

Part of the idea is to relax the dependency of the 'observable state' on the internal state. Observability in this sense is a measure of: how well can we infer the internal state from the observed state. In this case observable state is events published + registry written. As long as we continue from the last 'observed state', and as long there is no chance of a gap, an user can not tell for sure if the observed state is really in line with in-memory state or not. The in-memory state can be ahead. To some extend this is already the case. Whenever we publish an event in filebeat that is not yet ACKed (still in some queue), we have some internal read state ahead of the observable state. But once we want to close a file or reconfigure a harvester, we used to enforce some kind of sync between observable state and in-memory state. Like a barrier. Only after ensuring we are in sync we used to continue. By requiring the final sync can prevent autodiscovery based reconfiguration to work correctly if the outputs are blocked or the downstream system is slowed down => control functionalty has a strong dependency on data publishing functionality.

Given that the read state is bound to a file for example, and only managed/updated by an input temporarily (while the input is active), all state updates for the file will be linear and orderd. We do not allow 2 inputs to collect the same file/resource concurrently. This allows us to "release" the resource and give it to any other input at any point in time. There is no real need to wait for the ouputs.

The old config must shut down first completely before the new one can be started.

This is still a case. In order to process a resource one needs to hold the exclusive lock on the resource. An input must shut down and release the resource before it can be acquired by another input. The new input will not read the current read state from the registry file, but from the internal state that might be ahead of the state in the registry file (we already have local state per input in filebeat). All read state updates for the registry file (by the new input) will be scheduled after all the in-flight read state updates from the active old input configuration.

Lets assume we have a file foo.log. An input tails it and sends all the lines until the end. All lines except the last 2 are ACKED. Now it is stopped and a new input on the same file is started, it continues reading from the cache registry after the last line and sends the data. For some reasons, the last 2 lines of the old input can never be acked. Will this mean the state of the new input will never be written because of the ordering? What happens in this case?

In filebeat events are ordered. If the last 2 lines from the old input are not ACKed, then the new input will not make any progress. The registry file will keep the old state. If the beat is restarted it will start by beginning the last 2 lines that have not been ACKed yet.

If events can never be ACKed (e.g. due to mapping conflicts), then the event is dropped (we don't have a dead letter queue yet). This allows the system to continue making progress. But this is already the case as of today.

Scheduler: You plan to migrate the scheduler concept from Heartbeat to the input. Will the current concept of defining period: 5s still exists or will the user have to use cron definitions?

I think it is not really required for metricbeat modules, but maybe something we want consider. For heartbeat monitors the scheduler is a must.
The scheduler allows you to limit the maximum number of concurrent tasks (e.g. limit connections and file descriptors a beat will use concurrently). Metricbeat modules can be simply wrapped using go-routines or the scheduler.

About period setting. The scheduler implementation in heartbeat does not enforce any kind of scheduling definition. Actually a Schedule is just an interface asking you to implement Next() time.Time. I think the cron definitions make an interesting use-case. E.g. it allows you align data collection. Or in case you have some collection you only want to run once a day, but best at night, like auditbeat file integrity checks. e.g. how can you tell when check runs if auditbeat is restarted multiple times or in unknown intervals?

In heartbeat we also support special keywords for the schedule like @every <duration>. This would be similar to the period setting in metricbeat. But I don't have any plans to change how metricbeat tasks can be scheduled.

Copy link
Contributor

@exekias exekias left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking awesome! I left some comments and questions

// of an input.
type StatusObserver interface {
// Starting indicates that the input is about to be configured and started.
Starting()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, developer should not call this one, should it be something private?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be public or not (when other people will redefine it in their own package)

The observer is also for a developer, but mostly for internal use. Let's say that in the context of the agent we could have a special observer that keeps track of errors in the system and report them to the fleet.

Other possible builtin/chainable observer

  • We could also have a standard reporter that targets the logs.
  • One that keeps the local state of the system for the web API that the beat exposes.

So in the sense the StatusObserver is similar to an io.Reader in the usefulness for internal development.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@exekias Do you mean the Starting method in particular, or the interface overall?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was talking about the Starting method only. It seems folks implementing inputs won't need to call it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how you implement the actual input. We provide some helpers, but the most bare bones implementation would be compatible to Runner as is used by autodiscovery (this is, so we can integrate with reloading + autodiscovery).

// Closer provides support for shutdown signaling.
// It is compatible to context.Context, and can be used to cancel IO
// operations during shutdown.
Closer *chorus.Closer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about actually using context.Context? It would be more friendly for newcomers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They have a compatible interface so you can use it directly in anywhere that context.Context is available.
The different that the chorus.Closer has is support ripple close of go routine. It a library that we have created a few months ago to experiment with patterns https://github.com/elastic/go-concert/blob/master/chorus/closeref.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that sounds interesting, Is there any documentation I can look at? I'm not very familiar with "ripple close"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both context.Context and chorus.Closer build a hierarchy/tree for passing a closing signal down the chain, but stop if some intermediary has already been closed.

context.Context (besides some exceptions) builds the hierarchy by creating temporary go-routines. This is not optimal, but these go-routines are very short lived, as context.Context is supposed to be somewhat short lived.

The chorus.Closer on the other hand provides some helpers for calling a custom close function while the shutdown signal is propagated (useful for files, low level network sockets or if io.Reader is expected). It works somewhat similar to context.Context, but is supposed to have an extended lifetime. It is ensured that the type can be used when context.Context is expected, so it can be used with any networking libs that support cancelling. The Closer builds a simple tree instead of starting go-routines for signal propagation (reduces memory usage + reduces number of go-routines when creating a trace of all active go-routines).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to put context.Context on the interface and then pass a chorus.Closer? My concern is more about maintainability and documentation. Newcomers seeing context.Context will understand it out of the box, while chorus.Closer requires some learning.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might consider it in the later PRs. Reasons I don't want to use context.Context yet:

  • chorus package doesn't yet accept an interface. for some functionality. We need to work a little on the package
  • context.Value is inviting devs to add hacks to Beats (a context might be used by autodiscovery in the future)
  • actually we just want some means of shutdown signaling. The signaling is compatible to context.Context only for convenience. We might consider to make remove this compatibility, but provide a 'casting' function for you to use the signaler with context.Context

// The test is supposed to not take too much time. Packages running a tester
// might run the test with a pre-configured timeout.
type InputTester interface {
TestInput(closer *chorus.Closer, log *logp.Logger) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could make sense at the plugin level. You also mention there the schema for config validation, where this would go beyond and actually test the config

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Plugin level is just a definition of a feature we can provide. The Input type is no actual instance of an input collecting data, but a place-holder (interface type) for a configuration that has been read already read. It is used to create an instance later on.

The idea (as described already) is to build a complete typed configuration object in a first step, that then can be used by a command in the second step (no partially initialized 'beater'). For example the Filebeat struct would be:

type filebeat struct {
  Inputs []input.Input
  ...
}

func (fb *filebeat) Test(closer *chorus.Closer, log *logp.Logger) error {
  ...
  for _, input in fb.Inputs {
    err := input.TestInput(closer, log)
    ...
  }
  ...
}

With this in mind (long-term approach) each sub-command provided by beats may have multiple phases until the command can be executed. The read-config phase would be the common first-phase.

// On 'stop' an input must unregister configured tasks
//
// XXX: currently in heartbeat, but we will move it to libbeat
Scheduler *scheduler.Scheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you have in mind by this scheduler? I understand this could be used for Heartbeat monitors or Metricbeat modules. It would probably make sense to have another example using this API


package v2

func WhileActive(context Context, fn func() error) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported function WhileActive should have comment or be unexported

Input: &kafkaInput{
config: config,
saramaConfig: saramaConfig,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing ',' before newline in composite literal


package v2

func WhileActive(context Context, fn func() error) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported function WhileActive should have comment or be unexported

Input: &kafkaInput{
config: config,
saramaConfig: saramaConfig,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing ',' before newline in composite literal


package v2

func WhileActive(context Context, fn func() error) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported function WhileActive should have comment or be unexported

Input: &kafkaInput{
config: config,
saramaConfig: saramaConfig,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing ',' before newline in composite literal

Input: &kafkaInput{
config: config,
saramaConfig: saramaConfig,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing ',' before newline in composite literal

return &InputLoader{plugins: m}, nil
}

func (l *InputLoader) Configure(log *logp.Logger, config *common.Config) (Input, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method InputLoader.Configure should have comment or be unexported

Type string `config:"type"`
}

func NewInputLoader(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported function NewInputLoader should have comment or be unexported

Configure(log *logp.Logger, config *common.Config) (Input, error)
}

type InputLoader struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type InputLoader should have comment or be unexported

return &InputLoader{plugins: m}, nil
}

func (l *InputLoader) Configure(log *logp.Logger, config *common.Config) (Input, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method InputLoader.Configure should have comment or be unexported

Type string `config:"type"`
}

func NewInputLoader(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported function NewInputLoader should have comment or be unexported

Configure(log *logp.Logger, config *common.Config) (Input, error)
}

type InputLoader struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type InputLoader should have comment or be unexported

@urso
Copy link
Author

urso commented May 7, 2020

Closing. This PR will be replaced by smaller PRs and meta issue with details of the current implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Dependency:SIEM discussion in progress Pull request is currently in progress. Project:Filebeat-Input-v2 Team:Integrations Label for the Integrations team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants