Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Status subcommand reporting agent status for otel mode - Phase 2 #4047

Merged
merged 11 commits into from
Jan 22, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ fleet.yml.lock
fleet.yml.old
pkg/component/fake/component/component
pkg/component/fake/shipper/shipper
internal/pkg/agent/install/testblocking/testblocking
5 changes: 5 additions & 0 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
"github.com/elastic/elastic-agent/internal/pkg/composable"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/otel"
"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
Expand All @@ -46,6 +47,7 @@ func New(
testingMode bool,
fleetInitTimeout time.Duration,
disableMonitoring bool,
runAsOtel bool,
modifiers ...component.PlatformModifier,
) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) {

Expand Down Expand Up @@ -144,6 +146,9 @@ func New(
log.Debugf("Reloading of configuration is on, frequency is set to %s", cfg.Settings.Reload.Period)
configMgr = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader)
}
} else if runAsOtel {
// ignoring configuration in elastic-agent.yml
configMgr = otel.NewOtelModeConfigManager()
} else {
isManaged = true
var store storage.Store
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestLimitsLog(t *testing.T) {
true, // testingMode
time.Millisecond, // fleetInitTimeout
true, // disable monitoring
false, // not otel mode
)
require.NoError(t, err)

Expand Down
49 changes: 40 additions & 9 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"go.elastic.co/apm"
apmtransport "go.elastic.co/apm/transport"
"gopkg.in/yaml.v2"

"github.com/hashicorp/go-multierror"
"github.com/spf13/cobra"

"github.com/elastic/elastic-agent-libs/api"
Expand Down Expand Up @@ -61,6 +63,7 @@ const (
)

type cfgOverrider func(cfg *configuration.Configuration)
type awaiters []<-chan struct{}

func newRunCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command {
cmd := &cobra.Command{
Expand Down Expand Up @@ -136,17 +139,38 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration
go service.ProcessWindowsControlEvents(stopBeat)

// detect otel
if runAsOtel := otel.IsOtelConfig(ctx, paths.ConfigFile()); runAsOtel {
return otel.Run(ctx, cancel, stop, testingMode)
runAsOtel := otel.IsOtelConfig(ctx, paths.ConfigFile())
var awaiters awaiters
var resErr error
if runAsOtel {
var otelStartWg sync.WaitGroup
otelAwaiter := make(chan struct{})
awaiters = append(awaiters, otelAwaiter)

otelStartWg.Add(1)
go func() {
otelStartWg.Done()
if err := otel.Run(ctx, stop); err != nil {
resErr = multierror.Append(resErr, err)
}

// close awaiter handled in run loop
close(otelAwaiter)
}()

// wait for otel to start
otelStartWg.Wait()
}

// not otel continue as usual
return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, modifiers...)
if err := runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, runAsOtel, awaiters, modifiers...); err != nil {
resErr = multierror.Append(resErr, err)
}

return resErr
}

func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error {
cfg, err := loadConfig(ctx, override)
func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, runAsOtel bool, awaiters awaiters, modifiers ...component.PlatformModifier) error {
cfg, err := loadConfig(ctx, override, runAsOtel)
if err != nil {
return err
}
Expand Down Expand Up @@ -259,7 +283,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
l.Info("APM instrumentation disabled")
}

coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), runAsOtel, modifiers...)
if err != nil {
return err
}
Expand Down Expand Up @@ -366,6 +390,9 @@ LOOP:
}
cancel()
err = <-appErr
for _, a := range awaiters {
<-a // wait for awaiter to be done
}

if logShutdown {
l.Info("Shutting down completed.")
Expand All @@ -376,7 +403,11 @@ LOOP:
return err
}

func loadConfig(ctx context.Context, override cfgOverrider) (*configuration.Configuration, error) {
func loadConfig(ctx context.Context, override cfgOverrider, runAsOtel bool) (*configuration.Configuration, error) {
if runAsOtel {
return configuration.DefaultConfiguration(), nil
}

pathConfigFile := paths.ConfigFile()
rawConfig, err := config.LoadFile(pathConfigFile)
if err != nil {
Expand Down Expand Up @@ -527,7 +558,7 @@ func tryDelayEnroll(ctx context.Context, logger *logger.Logger, cfg *configurati
errors.M("path", enrollPath)))
}
logger.Info("Successfully performed delayed enrollment of this Elastic Agent.")
return loadConfig(ctx, override)
return loadConfig(ctx, override, false)
}

func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig) (*apm.Tracer, error) {
Expand Down
47 changes: 47 additions & 0 deletions internal/pkg/otel/config_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package otel

import (
"context"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
)

// OtelModeConfigManager serves as a config manager for OTel use cases
// In this case agent should ignore all configuration coming from elastic-agent.yml file
// or other sources.
type OtelModeConfigManager struct {
ch chan coordinator.ConfigChange
errCh chan error
}

// NewOtelModeConfigManager creates new OtelModeConfigManager ignoring
// configuration coming from other sources.
func NewOtelModeConfigManager() *OtelModeConfigManager {
return &OtelModeConfigManager{
ch: make(chan coordinator.ConfigChange),
errCh: make(chan error),
}
}

func (t *OtelModeConfigManager) Run(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}

func (t *OtelModeConfigManager) Errors() <-chan error {
return t.errCh
}

// ActionErrors returns the error channel for actions.
// Returns nil channel.
func (t *OtelModeConfigManager) ActionErrors() <-chan error {
return nil
}

func (t *OtelModeConfigManager) Watch() <-chan coordinator.ConfigChange {
return t.ch
}
2 changes: 1 addition & 1 deletion internal/pkg/otel/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func IsOtelConfig(ctx context.Context, pathConfigFile string) bool {
return false
}

func Run(ctx context.Context, cancel context.CancelFunc, stop chan bool, testingMode bool) error {
func Run(ctx context.Context, stop chan bool) error {
fmt.Fprintln(os.Stdout, "Starting in otel mode")
settings, err := newSettings(paths.ConfigFile(), release.Version())
if err != nil {
Expand Down
14 changes: 8 additions & 6 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -444,7 +443,7 @@ func RunProcess(t *testing.T,
// when `Run` is called.
//
// if shouldWatchState is set to false, communicating state does not happen.
func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, states ...State) error {
func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, enableTestingMode bool, states ...State) error {
if _, deadlineSet := ctx.Deadline(); !deadlineSet {
f.t.Fatal("Context passed to Fixture.Run() has no deadline set.")
}
Expand Down Expand Up @@ -497,7 +496,10 @@ func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, stat
stdOut := newLogWatcher(logProxy)
stdErr := newLogWatcher(logProxy)

args := []string{"run", "-e", "--disable-encrypted-store", "--testing-mode"}
args := []string{"run", "-e", "--disable-encrypted-store"}
if enableTestingMode {
args = append(args, "--testing-mode")
}

args = append(args, f.additionalArgs...)

Expand Down Expand Up @@ -599,7 +601,7 @@ func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, stat
// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored
// when `Run` is called.
func (f *Fixture) Run(ctx context.Context, states ...State) error {
return f.RunWithClient(ctx, true, states...)
return f.RunWithClient(ctx, true, true, states...)
}

// Exec provides a way of performing subcommand on the prepared Elastic Agent binary.
Expand Down Expand Up @@ -839,7 +841,7 @@ func (f *Fixture) prepareComponents(workDir string, components ...UsableComponen
if err != nil {
return err
}
contents, err := ioutil.ReadDir(componentsDir)
contents, err := os.ReadDir(componentsDir)
if err != nil {
return fmt.Errorf("failed to read contents of components directory %s: %w", componentsDir, err)
}
Expand Down Expand Up @@ -979,7 +981,7 @@ func getCacheDir(caller string, name string) (string, error) {
// FindComponentsDir identifies the directory that holds the components.
func FindComponentsDir(dir string) (string, error) {
dataDir := filepath.Join(dir, "data")
agentVersions, err := ioutil.ReadDir(dataDir)
agentVersions, err := os.ReadDir(dataDir)
if err != nil {
return "", fmt.Errorf("failed to read contents of the data directory %s: %w", dataDir, err)
}
Expand Down
21 changes: 19 additions & 2 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,31 @@ func TestOtelFileProcessing(t *testing.T) {
fixtureWg.Add(1)
go func() {
defer fixtureWg.Done()
err = fixture.RunWithClient(ctx, false)
err = fixture.RunWithClient(ctx, false, false)
}()

var content []byte
watchLines := linesTrackMap([]string{
`"stringValue":"syslog"`, // syslog is being processed
`"stringValue":"system.log"`, // system.log is being processed
})

// check `elastic-agent status` returns successfully
require.Eventuallyf(t, func() bool {
// This will return errors until it connects to the agent,
// they're mostly noise because until the agent starts running
// we will get connection errors. If the test fails
// the agent logs will be present in the error message
// which should help to explain why the agent was not
// healthy.
err := fixture.IsHealthy(ctx)
return err == nil
},
2*time.Minute, time.Second,
"Elastic-Agent did not report healthy. Agent status error: \"%v\"",
err,
)

require.Eventually(t,
func() bool {
// verify file exists
Expand Down Expand Up @@ -240,7 +257,7 @@ func TestOtelAPMIngestion(t *testing.T) {
var fixtureWg sync.WaitGroup
fixtureWg.Add(1)
go func() {
fixture.RunWithClient(ctx, false)
fixture.RunWithClient(ctx, false, false)
fixtureWg.Done()
}()

Expand Down
Loading