Skip to content

Commit

Permalink
Fix [V2]: Elastic Agent Install is broken. (#1331)
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksmaus authored Sep 29, 2022
1 parent 5051218 commit e91e3ce
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 68 deletions.
65 changes: 2 additions & 63 deletions internal/pkg/agent/cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -21,9 +20,9 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/agent/vars"
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
"github.com/elastic/elastic-agent/internal/pkg/cli"
"github.com/elastic/elastic-agent/internal/pkg/composable"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/config/operations"
"github.com/elastic/elastic-agent/pkg/component"
Expand Down Expand Up @@ -302,7 +301,7 @@ func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath strin
}

// Wait for the variables based on the timeout.
vars, err := waitForVariables(ctx, l, cfg, timeout)
vars, err := vars.WaitForVariables(ctx, l, cfg, timeout)
if err != nil {
return nil, fmt.Errorf("failed to gather variables: %w", err)
}
Expand All @@ -326,66 +325,6 @@ func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath strin
return m, nil
}

func waitForVariables(ctx context.Context, l *logger.Logger, cfg *config.Config, wait time.Duration) ([]*transpiler.Vars, error) {
var cancel context.CancelFunc
var vars []*transpiler.Vars

composable, err := composable.New(l, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create composable controller: %w", err)
}

hasTimeout := false
if wait > time.Duration(0) {
hasTimeout = true
ctx, cancel = context.WithTimeout(ctx, wait)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
var err error
for {
select {
case <-ctx.Done():
if err == nil {
err = ctx.Err()
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
err = nil
}
return err
case cErr := <-composable.Errors():
err = cErr
if err != nil {
cancel()
}
case cVars := <-composable.Watch():
vars = cVars
if !hasTimeout {
cancel()
}
}
}
})

g.Go(func() error {
err := composable.Run(ctx)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
err = nil
}
return err
})

err = g.Wait()
if err != nil {
return nil, err
}
return vars, nil
}

func printComponents(components []component.Component, streams *cli.IOStreams) error {
topLevel := struct {
Components []component.Component `yaml:"components"`
Expand Down
7 changes: 2 additions & 5 deletions internal/pkg/agent/install/uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/agent/vars"
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
"github.com/elastic/elastic-agent/internal/pkg/composable"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/config/operations"
"github.com/elastic/elastic-agent/pkg/component"
Expand Down Expand Up @@ -195,13 +195,10 @@ func applyDynamics(ctx context.Context, log *logger.Logger, cfg *config.Config)
// apply dynamic inputs
inputs, ok := transpiler.Lookup(ast, "inputs")
if ok {
varsArray := make([]*transpiler.Vars, 0)

ctrl, err := composable.New(log, cfg)
varsArray, err := vars.WaitForVariables(ctx, log, cfg, 0)
if err != nil {
return nil, err
}
_ = ctrl.Run(ctx)

renderedInputs, err := transpiler.RenderInputs(inputs, varsArray)
if err != nil {
Expand Down
78 changes: 78 additions & 0 deletions internal/pkg/agent/vars/vars.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package vars

import (
"context"
"errors"
"fmt"
"time"

"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/composable"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/pkg/core/logger"
"golang.org/x/sync/errgroup"
)

func WaitForVariables(ctx context.Context, l *logger.Logger, cfg *config.Config, wait time.Duration) ([]*transpiler.Vars, error) {
var cancel context.CancelFunc
var vars []*transpiler.Vars

composable, err := composable.New(l, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create composable controller: %w", err)
}

hasTimeout := false
if wait > time.Duration(0) {
hasTimeout = true
ctx, cancel = context.WithTimeout(ctx, wait)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
var err error
for {
select {
case <-ctx.Done():
if err == nil {
err = ctx.Err()
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
err = nil
}
return err
case cErr := <-composable.Errors():
err = cErr
if err != nil {
cancel()
}
case cVars := <-composable.Watch():
vars = cVars
if !hasTimeout {
cancel()
}
}
}
})

g.Go(func() error {
err := composable.Run(ctx)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
err = nil
}
return err
})

err = g.Wait()
if err != nil {
return nil, err
}
return vars, nil
}

0 comments on commit e91e3ce

Please sign in to comment.