Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/{cel,httpjson}: allow trampolining from HTTPJSON to CEL #41387

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (

const defaultMaxExecutions = 1000

// config is the top-level configuration for a cel input.
type config struct {
// Config is the top-level configuration for a cel input. It is an internal
// type that is only exported to allow use by the httpjson package.
type Config struct {
// Interval is the period interval between runs of the input.
Interval time.Duration `config:"interval" validate:"required"`

Expand Down Expand Up @@ -69,7 +70,7 @@ type redact struct {
Delete bool `config:"delete"`
}

func (c config) Validate() error {
func (c Config) Validate() error {
if c.Redact == nil {
logp.L().Named("input.cel").Warn("missing recommended 'redact' configuration: " +
"see documentation for details: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-cel.html#_redact")
Expand All @@ -96,15 +97,15 @@ func (c config) Validate() error {
return nil
}

func defaultConfig() config {
func DefaultConfig() Config {
maxExecutions := defaultMaxExecutions
maxAttempts := 5
waitMin := time.Second
waitMax := time.Minute
transport := httpcommon.DefaultHTTPTransportSettings()
transport.Timeout = 30 * time.Second

return config{
return Config{
MaxExecutions: &maxExecutions,
Interval: time.Minute,
Resource: &ResourceConfig{
Expand Down
8 changes: 4 additions & 4 deletions x-pack/filebeat/input/cel/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestGetProviderIsCanonical(t *testing.T) {
}

func TestRegexpConfig(t *testing.T) {
cfg := config{
cfg := Config{
Interval: time.Minute,
Program: `{}`,
Resource: &ResourceConfig{URL: &urlConfig{URL: &url.URL{}}},
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestConfigMustFailWithInvalidResource(t *testing.T) {
"resource.url": test.val,
}
cfg := conf.MustNewConfigFrom(m)
conf := defaultConfig()
conf := DefaultConfig()
conf.Program = "{}" // Provide an empty program to avoid validation error from that.
conf.Redact = &redact{} // Make sure we pass the redact requirement.
err := cfg.Unpack(&conf)
Expand Down Expand Up @@ -604,7 +604,7 @@ func TestConfigOauth2Validation(t *testing.T) {

test.input["resource.url"] = "localhost"
cfg := conf.MustNewConfigFrom(test.input)
conf := defaultConfig()
conf := DefaultConfig()
conf.Program = "{}" // Provide an empty program to avoid validation error from that.
conf.Redact = &redact{} // Make sure we pass the redact requirement.
err := cfg.Unpack(&conf)
Expand Down Expand Up @@ -656,7 +656,7 @@ func TestKeepAliveSetting(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
test.input["resource.url"] = "localhost"
cfg := conf.MustNewConfigFrom(test.input)
conf := defaultConfig()
conf := DefaultConfig()
conf.Program = "{}" // Provide an empty program to avoid validation error from that.
conf.Redact = &redact{} // Make sure we pass the redact requirement.
err := cfg.Unpack(&conf)
Expand Down
33 changes: 20 additions & 13 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,26 @@
}
}

type input struct {
var _ inputcursor.Input = Input{}

// Input is the CEL input implementation of inputcursor.Input. It is an internal
// type that is only exported to allow use by the httpjson package.
type Input struct {
time func() time.Time
}

// now is time.Now with a modifiable time source.
func (i input) now() time.Time {
func (i Input) now() time.Time {
if i.time == nil {
return time.Now()
}
return i.time()
}

func (input) Name() string { return inputName }
func (Input) Name() string { return inputName }

func (input) Test(src inputcursor.Source, _ v2.TestContext) error {
cfg := src.(*source).cfg
func (Input) Test(src inputcursor.Source, _ v2.TestContext) error {
cfg := src.(*Source).Config
if !wantClient(cfg) {
return nil
}
Expand All @@ -104,18 +108,21 @@

// Run starts the input and blocks until it ends completes. It will return on
// context cancellation or type invalidity errors, any other error will be retried.
func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error {
func (Input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error {
var cursor map[string]interface{}
env.UpdateStatus(status.Starting, "")
if !crsr.IsNew() { // Allow the user to bootstrap the program if needed.

// I can't believe this is necessary.
crsrIsValid := crsr != inputcursor.Cursor{}
if crsrIsValid && !crsr.IsNew() { // Allow the user to bootstrap the program if needed.
err := crsr.Unpack(&cursor)
if err != nil {
env.UpdateStatus(status.Failed, "failed to unpack cursor: "+err.Error())
return err
}
}

err := input{}.run(env, src.(*source), cursor, pub)
err := Input{}.run(env, src.(*Source), cursor, pub)
if err != nil {
env.UpdateStatus(status.Failed, "failed to run: "+err.Error())
return err
Expand All @@ -133,8 +140,8 @@
return strings.ReplaceAll(name, string(filepath.Separator), "_")
}

func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error {
cfg := src.cfg
func (i Input) run(env v2.Context, src *Source, cursor map[string]interface{}, pub inputcursor.Publisher) error {
cfg := src.Config
log := env.Logger.With("input_url", cfg.Resource.URL)

metrics, reg := newInputMetrics(env.ID)
Expand Down Expand Up @@ -727,7 +734,7 @@
// https://github.com/natefinch/lumberjack/blob/4cb27fcfbb0f35cb48c542c5ea80b7c1d18933d0/lumberjack.go#L39
const lumberjackTimestamp = "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]-[0-9][0-9]-[0-9][0-9].[0-9][0-9][0-9]"

func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitoring.Registry) (*http.Client, *httplog.LoggingRoundTripper, error) {
func newClient(ctx context.Context, cfg Config, log *logp.Logger, reg *monitoring.Registry) (*http.Client, *httplog.LoggingRoundTripper, error) {
c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL, cfg.Resource.KeepAlive.settings())...)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -822,7 +829,7 @@
return c, trace, nil
}

func wantClient(cfg config) bool {
func wantClient(cfg Config) bool {
switch scheme, _, _ := strings.Cut(cfg.Resource.URL.Scheme, "+"); scheme {
case "http", "https":
return true
Expand Down Expand Up @@ -952,7 +959,7 @@
return rate.NewLimiter(r, b)
}

func regexpsFromConfig(cfg config) (map[string]*regexp.Regexp, error) {
func regexpsFromConfig(cfg Config) (map[string]*regexp.Regexp, error) {
if len(cfg.Regexps) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -1259,7 +1266,7 @@
// walkMap walks to all ends of the provided path in m and applies fn to the
// final element of each walk. Nested arrays are not handled.
func walkMap(m mapstr.M, path string, fn func(parent mapstr.M, key string)) {
key, rest, more := strings.Cut(path, ".")

Check failure on line 1269 in x-pack/filebeat/input/cel/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

rest declared and not used (typecheck)
v, ok := m[key]
if !ok {
return
Expand Down
14 changes: 8 additions & 6 deletions x-pack/filebeat/input/cel/input_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManage
}

func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) {
src := &source{cfg: defaultConfig()}
if err := cfg.Unpack(&src.cfg); err != nil {
src := &Source{Config: DefaultConfig()}
if err := cfg.Unpack(&src.Config); err != nil {
return nil, nil, err
}
return []inputcursor.Source{src}, input{}, nil
return []inputcursor.Source{src}, Input{}, nil
}

type source struct{ cfg config }
// Source is the CEL input implementation of inputcursor.Source. It is an internal
// type that is only exported to allow use by the httpjson package.
type Source struct{ Config Config }

func (s *source) Name() string { return s.cfg.Resource.URL.String() }
func (s *Source) Name() string { return s.Config.Resource.URL.String() }

// Init initializes both wrapped input managers.
func (m InputManager) Init(grp unison.Group) error {
Expand All @@ -52,7 +54,7 @@ func (m InputManager) Init(grp unison.Group) error {

// Create creates a cursor input manager.
func (m InputManager) Create(cfg *conf.C) (v2.Input, error) {
config := defaultConfig()
config := DefaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1711,7 +1711,7 @@ func TestInput(t *testing.T) {

cfg := conf.MustNewConfigFrom(test.config)

conf := defaultConfig()
conf := DefaultConfig()
conf.Redact = &redact{} // Make sure we pass the redact requirement.
err := cfg.Unpack(&conf)
if err != nil {
Expand All @@ -1727,12 +1727,12 @@ func TestInput(t *testing.T) {
conf.Resource.Tracer.Filename = filepath.Join(tempDir, conf.Resource.Tracer.Filename)
}

name := input{}.Name()
name := Input{}.Name()
if name != "cel" {
t.Errorf(`unexpected input name: got:%q want:"cel"`, name)
}
src := &source{conf}
err = input{}.Test(src, v2.TestContext{})
src := &Source{conf}
err = Input{}.Test(src, v2.TestContext{})
if err != nil {
t.Fatalf("unexpected error running test: %v", err)
}
Expand All @@ -1753,7 +1753,7 @@ func TestInput(t *testing.T) {
cancel()
}
}
err = input{test.time}.run(v2Ctx, src, test.persistCursor, &client)
err = Input{test.time}.run(v2Ctx, src, test.persistCursor, &client)
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error from running input: got:%v want:%v", err, test.wantErr)
}
Expand Down
21 changes: 18 additions & 3 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@ import (
"strings"
"time"

"github.com/elastic/beats/v7/x-pack/filebeat/input/cel"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

type config struct {
Interval time.Duration `config:"interval" validate:"required"`
Interval time.Duration `config:"interval"`
Auth *authConfig `config:"auth"`
Request *requestConfig `config:"request" validate:"required"`
Request *requestConfig `config:"request"`
Response *responseConfig `config:"response"`
Cursor cursorConfig `config:"cursor"`
Chain []chainConfig `config:"chain"`

CEL *cel.Config `config:"cel"`
ucfg *conf.C
}

type cursorConfig map[string]cursorEntry
Expand All @@ -35,9 +40,19 @@ func (ce cursorEntry) mustIgnoreEmptyValue() bool {
}

func (c config) Validate() error {
if c.Interval <= 0 {
if c.CEL != nil {
return c.CEL.Validate()
}

if c.Interval == 0 {
return errors.New("interval must be configured")
}
if c.Interval < 0 {
return errors.New("interval must be greater than 0")
}
if c.Request == nil {
return errors.New("request configuration must be present")
}
for _, v := range c.Chain {
if v.Step == nil && v.While == nil {
return errors.New("both step & while blocks in a chain cannot be empty")
Expand Down
23 changes: 20 additions & 3 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/cel"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httpmon"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -113,11 +114,27 @@ func test(url *url.URL) error {

func runWithMetrics(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor) error {
reg, unreg := inputmon.NewInputRegistry("httpjson", ctx.ID, nil)
defer unreg()
return run(ctx, cfg, pub, crsr, reg)
return run(ctx, cfg, pub, crsr, reg, unreg)
}

func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor, reg *monitoring.Registry) error {
func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor, reg *monitoring.Registry, unreg func()) error {
if cfg.CEL != nil {
unreg()
// Apply cel defaults and re-unmarshal config.
*cfg.CEL = cel.DefaultConfig()
err := cfg.ucfg.Unpack(&cfg)
if err != nil {
return err
}
// Paper over impedance mismatch.
var csr inputcursor.Cursor
if crsr != nil {
csr = *crsr
}
return cel.Input{}.Run(ctx, &cel.Source{Config: *cfg.CEL}, csr, pub)
}
defer unreg()

log := ctx.Logger.With("input_url", cfg.Request.URL)

stdCtx := ctxtool.FromCanceller(ctx.Cancelation)
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/httpjson/input_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package httpjson
import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/x-pack/filebeat/input/cel"
conf "github.com/elastic/elastic-agent-libs/config"
)

Expand All @@ -29,6 +30,7 @@ func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, erro
if err := cfg.Unpack(&conf); err != nil {
return nil, nil, err
}
conf.ucfg = cfg
sources, inp := newCursorInput(conf)
return sources, inp, nil
}
Expand All @@ -40,6 +42,9 @@ func newCursorInput(config config) ([]inputcursor.Source, inputcursor.Input) {
}

func (in *cursorInput) Test(src inputcursor.Source, _ v2.TestContext) error {
if _, ok := src.(*cel.Source); ok {
return cel.Input{}.Test(src, v2.TestContext{})
}
return test((src.(*source)).config.Request.URL.URL)
}

Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/httpjson/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/x-pack/filebeat/input/cel"
conf "github.com/elastic/elastic-agent-libs/config"
)

Expand All @@ -24,6 +25,7 @@ func statelessConfigure(cfg *conf.C) (stateless.Input, error) {
if err := cfg.Unpack(&conf); err != nil {
return nil, err
}
conf.ucfg = cfg
return newStatelessInput(conf), nil
}

Expand All @@ -32,6 +34,9 @@ func newStatelessInput(config config) *statelessInput {
}

func (in *statelessInput) Test(v2.TestContext) error {
if in.config.CEL != nil {
return cel.Input{}.Test(&cel.Source{Config: *in.config.CEL}, v2.TestContext{})
}
return test(in.config.Request.URL.URL)
}

Expand Down
Loading
Loading