From e91e3ce8889d199a2a2a0e4256cddd13c9c43183 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Wed, 28 Sep 2022 20:37:19 -0400 Subject: [PATCH] Fix [V2]: Elastic Agent Install is broken. (#1331) --- internal/pkg/agent/cmd/inspect.go | 65 +-------------------- internal/pkg/agent/install/uninstall.go | 7 +-- internal/pkg/agent/vars/vars.go | 78 +++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 68 deletions(-) create mode 100644 internal/pkg/agent/vars/vars.go diff --git a/internal/pkg/agent/cmd/inspect.go b/internal/pkg/agent/cmd/inspect.go index 32455a179c4..77e917c9c3c 100644 --- a/internal/pkg/agent/cmd/inspect.go +++ b/internal/pkg/agent/cmd/inspect.go @@ -12,7 +12,6 @@ import ( "time" "github.com/spf13/cobra" - "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent-libs/logp" @@ -21,9 +20,9 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" + "github.com/elastic/elastic-agent/internal/pkg/agent/vars" "github.com/elastic/elastic-agent/internal/pkg/capabilities" "github.com/elastic/elastic-agent/internal/pkg/cli" - "github.com/elastic/elastic-agent/internal/pkg/composable" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/config/operations" "github.com/elastic/elastic-agent/pkg/component" @@ -302,7 +301,7 @@ func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath strin } // Wait for the variables based on the timeout. - vars, err := waitForVariables(ctx, l, cfg, timeout) + vars, err := vars.WaitForVariables(ctx, l, cfg, timeout) if err != nil { return nil, fmt.Errorf("failed to gather variables: %w", err) } @@ -326,66 +325,6 @@ func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath strin return m, nil } -func waitForVariables(ctx context.Context, l *logger.Logger, cfg *config.Config, wait time.Duration) ([]*transpiler.Vars, error) { - var cancel context.CancelFunc - var vars []*transpiler.Vars - - composable, err := composable.New(l, cfg) - if err != nil { - return nil, fmt.Errorf("failed to create composable controller: %w", err) - } - - hasTimeout := false - if wait > time.Duration(0) { - hasTimeout = true - ctx, cancel = context.WithTimeout(ctx, wait) - } else { - ctx, cancel = context.WithCancel(ctx) - } - defer cancel() - - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - var err error - for { - select { - case <-ctx.Done(): - if err == nil { - err = ctx.Err() - } - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - err = nil - } - return err - case cErr := <-composable.Errors(): - err = cErr - if err != nil { - cancel() - } - case cVars := <-composable.Watch(): - vars = cVars - if !hasTimeout { - cancel() - } - } - } - }) - - g.Go(func() error { - err := composable.Run(ctx) - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - err = nil - } - return err - }) - - err = g.Wait() - if err != nil { - return nil, err - } - return vars, nil -} - func printComponents(components []component.Component, streams *cli.IOStreams) error { topLevel := struct { Components []component.Component `yaml:"components"` diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index 61588f5de97..df5c11d747c 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -19,8 +19,8 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" + "github.com/elastic/elastic-agent/internal/pkg/agent/vars" "github.com/elastic/elastic-agent/internal/pkg/capabilities" - "github.com/elastic/elastic-agent/internal/pkg/composable" "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/config/operations" "github.com/elastic/elastic-agent/pkg/component" @@ -195,13 +195,10 @@ func applyDynamics(ctx context.Context, log *logger.Logger, cfg *config.Config) // apply dynamic inputs inputs, ok := transpiler.Lookup(ast, "inputs") if ok { - varsArray := make([]*transpiler.Vars, 0) - - ctrl, err := composable.New(log, cfg) + varsArray, err := vars.WaitForVariables(ctx, log, cfg, 0) if err != nil { return nil, err } - _ = ctrl.Run(ctx) renderedInputs, err := transpiler.RenderInputs(inputs, varsArray) if err != nil { diff --git a/internal/pkg/agent/vars/vars.go b/internal/pkg/agent/vars/vars.go new file mode 100644 index 00000000000..7f0aff1c329 --- /dev/null +++ b/internal/pkg/agent/vars/vars.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package vars + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" + "github.com/elastic/elastic-agent/internal/pkg/composable" + "github.com/elastic/elastic-agent/internal/pkg/config" + "github.com/elastic/elastic-agent/pkg/core/logger" + "golang.org/x/sync/errgroup" +) + +func WaitForVariables(ctx context.Context, l *logger.Logger, cfg *config.Config, wait time.Duration) ([]*transpiler.Vars, error) { + var cancel context.CancelFunc + var vars []*transpiler.Vars + + composable, err := composable.New(l, cfg) + if err != nil { + return nil, fmt.Errorf("failed to create composable controller: %w", err) + } + + hasTimeout := false + if wait > time.Duration(0) { + hasTimeout = true + ctx, cancel = context.WithTimeout(ctx, wait) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + var err error + for { + select { + case <-ctx.Done(): + if err == nil { + err = ctx.Err() + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + err = nil + } + return err + case cErr := <-composable.Errors(): + err = cErr + if err != nil { + cancel() + } + case cVars := <-composable.Watch(): + vars = cVars + if !hasTimeout { + cancel() + } + } + } + }) + + g.Go(func() error { + err := composable.Run(ctx) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + err = nil + } + return err + }) + + err = g.Wait() + if err != nil { + return nil, err + } + return vars, nil +}