diff --git a/.chloggen/refactor-config-in-opampsupervisor.yaml b/.chloggen/refactor-config-in-opampsupervisor.yaml new file mode 100644 index 000000000000..d31cfcdc700d --- /dev/null +++ b/.chloggen/refactor-config-in-opampsupervisor.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Pass config structure instead of file path when using NewSupervisor function + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34379] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 9316e8ffde8a..a6f41243e542 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -154,7 +154,11 @@ func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFa func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor { cfgFile := getSupervisorConfig(t, configType, extraConfigData) - s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name()) + + cfg, err := config.Load(cfgFile.Name()) + require.NoError(t, err) + + s, err := supervisor.NewSupervisor(zap.NewNop(), cfg) require.NoError(t, err) return s diff --git a/cmd/opampsupervisor/main.go b/cmd/opampsupervisor/main.go index 3e58ee4730a7..137d2d37c3a2 100644 --- a/cmd/opampsupervisor/main.go +++ b/cmd/opampsupervisor/main.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor" + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config" ) func main() { @@ -19,7 +20,14 @@ func main() { logger, _ := zap.NewDevelopment() - supervisor, err := supervisor.NewSupervisor(logger, *configFlag) + cfg, err := config.Load(*configFlag) + if err != nil { + logger.Error(err.Error()) + os.Exit(-1) + return + } + + supervisor, err := supervisor.NewSupervisor(logger, cfg) if err != nil { logger.Error(err.Error()) os.Exit(-1) diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index 3954da8cb79b..a260410f9c95 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -13,6 +13,9 @@ import ( "runtime" "time" + "github.com/knadh/koanf/parsers/yaml" + "github.com/knadh/koanf/providers/file" + "github.com/knadh/koanf/v2" "github.com/open-telemetry/opamp-go/protobufs" "go.opentelemetry.io/collector/config/configtls" ) @@ -25,6 +28,33 @@ type Supervisor struct { Storage Storage `mapstructure:"storage"` } +// Load loads the Supervisor config from a file. +func Load(configFile string) (Supervisor, error) { + if configFile == "" { + return Supervisor{}, errors.New("path to config file cannot be empty") + } + + k := koanf.New("::") + if err := k.Load(file.Provider(configFile), yaml.Parser()); err != nil { + return Supervisor{}, err + } + + decodeConf := koanf.UnmarshalConf{ + Tag: "mapstructure", + } + + cfg := DefaultSupervisor() + if err := k.UnmarshalWithConf("", &cfg, decodeConf); err != nil { + return Supervisor{}, fmt.Errorf("cannot parse %s: %w", configFile, err) + } + + if err := cfg.Validate(); err != nil { + return Supervisor{}, fmt.Errorf("cannot validate supervisor config %s: %w", configFile, err) + } + + return cfg, nil +} + func (s Supervisor) Validate() error { if err := s.Server.Validate(); err != nil { return err diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 6fa55948f0fd..bb4e9e0fbd1e 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -23,7 +23,6 @@ import ( "github.com/google/uuid" "github.com/knadh/koanf/maps" "github.com/knadh/koanf/parsers/yaml" - "github.com/knadh/koanf/providers/file" "github.com/knadh/koanf/providers/rawbytes" "github.com/knadh/koanf/v2" "github.com/open-telemetry/opamp-go/client" @@ -136,7 +135,7 @@ type Supervisor struct { opampServerPort int } -func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { +func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, error) { s := &Supervisor{ logger: logger, pidProvider: defaultPIDProvider{}, @@ -153,14 +152,12 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return nil, err } - if err := s.loadConfig(configFile); err != nil { - return nil, fmt.Errorf("error loading config: %w", err) - } - - if err := s.config.Validate(); err != nil { + if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("error validating config: %w", err) } + s.config = cfg + if err := os.MkdirAll(s.config.Storage.Directory, 0700); err != nil { return nil, fmt.Errorf("error creating storage dir: %w", err) } @@ -248,28 +245,6 @@ func (s *Supervisor) createTemplates() error { return nil } -func (s *Supervisor) loadConfig(configFile string) error { - if configFile == "" { - return errors.New("path to config file cannot be empty") - } - - k := koanf.New("::") - if err := k.Load(file.Provider(configFile), yaml.Parser()); err != nil { - return err - } - - decodeConf := koanf.UnmarshalConf{ - Tag: "mapstructure", - } - - s.config = config.DefaultSupervisor() - if err := k.UnmarshalWithConf("", &s.config, decodeConf); err != nil { - return fmt.Errorf("cannot parse %v: %w", configFile, err) - } - - return nil -} - // getBootstrapInfo obtains the Collector's agent description by // starting a Collector with a specific config that only starts // an OpAMP extension, obtains the agent description, then @@ -461,11 +436,17 @@ func (s *Supervisor) startOpAMPClient() error { func (s *Supervisor) startOpAMPServer() error { s.opampServer = server.New(newLoggerFromZap(s.logger)) + var err error + s.opampServerPort, err = s.findRandomPort() + if err != nil { + return err + } + s.logger.Debug("Starting OpAMP server...") connected := &atomic.Bool{} - err := s.opampServer.Start(flattenedSettings{ + err = s.opampServer.Start(flattenedSettings{ endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort), onConnectingFunc: func(_ *http.Request) (bool, int) { // Only allow one agent to be connected the this server at a time. diff --git a/cmd/opampsupervisor/supervisor/supervisor_test.go b/cmd/opampsupervisor/supervisor/supervisor_test.go index 9f66bbf71578..f6eb4dfc7efa 100644 --- a/cmd/opampsupervisor/supervisor/supervisor_test.go +++ b/cmd/opampsupervisor/supervisor/supervisor_test.go @@ -6,8 +6,10 @@ package supervisor import ( "bytes" "context" + "fmt" "net" "os" + "path/filepath" "sync/atomic" "testing" "time" @@ -23,6 +25,72 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config" ) +func setupSupervisorConfig(t *testing.T) config.Supervisor { + t.Helper() + + tmpDir, err := os.MkdirTemp(os.TempDir(), "*") + require.NoError(t, err) + + executablePath := filepath.Join(tmpDir, "binary") + err = os.WriteFile(executablePath, []byte{}, 0o600) + require.NoError(t, err) + + configuration := ` +server: + endpoint: ws://localhost/v1/opamp + tls: + insecure: true + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + accepts_restart_command: true + +storage: + directory: %s + +agent: + executable: %s +` + configuration = fmt.Sprintf(configuration, filepath.Join(tmpDir, "storage"), executablePath) + + cfgPath := filepath.Join(tmpDir, "config.yaml") + err = os.WriteFile(cfgPath, []byte(configuration), 0o600) + require.NoError(t, err) + + cfg, err := config.Load(cfgPath) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, os.Chmod(tmpDir, 0o700)) + require.NoError(t, os.RemoveAll(tmpDir)) + }) + + return cfg +} + +func Test_NewSupervisor(t *testing.T) { + cfg := setupSupervisorConfig(t) + supervisor, err := NewSupervisor(zap.L(), cfg) + require.NoError(t, err) + require.NotNil(t, supervisor) +} + +func Test_NewSupervisorFailedStorageCreation(t *testing.T) { + cfg := setupSupervisorConfig(t) + + dir := filepath.Dir(cfg.Storage.Directory) + require.NoError(t, os.Chmod(dir, 0o500)) + + supervisor, err := NewSupervisor(zap.L(), cfg) + require.Error(t, err) + require.ErrorContains(t, err, "error creating storage dir") + require.Nil(t, supervisor) +} + func Test_composeEffectiveConfig(t *testing.T) { acceptsRemoteConfig := true s := Supervisor{