Skip to content

Commit

Permalink
[cmd/opampsupervisor] Refactor NewSupervisor function (open-telemetry…
Browse files Browse the repository at this point in the history
…#34597)

**Description:** Pass config structure to `NewSupervisor` instead of a
file path.

**Link to tracking Issue:** open-telemetry#34379

**Testing:** I run tests and adjusted the existing ones

**Documentation:** -

---------

Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com>
  • Loading branch information
2 people authored and jriguera committed Oct 4, 2024
1 parent 6432cb0 commit 751af79
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 32 deletions.
27 changes: 27 additions & 0 deletions .chloggen/refactor-config-in-opampsupervisor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Pass config structure instead of file path when using NewSupervisor function

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34379]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
6 changes: 5 additions & 1 deletion cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFa

func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor {
cfgFile := getSupervisorConfig(t, configType, extraConfigData)
s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name())

cfg, err := config.Load(cfgFile.Name())
require.NoError(t, err)

s, err := supervisor.NewSupervisor(zap.NewNop(), cfg)
require.NoError(t, err)

return s
Expand Down
10 changes: 9 additions & 1 deletion cmd/opampsupervisor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
)

func main() {
Expand All @@ -19,7 +20,14 @@ func main() {

logger, _ := zap.NewDevelopment()

supervisor, err := supervisor.NewSupervisor(logger, *configFlag)
cfg, err := config.Load(*configFlag)
if err != nil {
logger.Error(err.Error())
os.Exit(-1)
return
}

supervisor, err := supervisor.NewSupervisor(logger, cfg)
if err != nil {
logger.Error(err.Error())
os.Exit(-1)
Expand Down
30 changes: 30 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"runtime"
"time"

"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/file"
"github.com/knadh/koanf/v2"
"github.com/open-telemetry/opamp-go/protobufs"
"go.opentelemetry.io/collector/config/configtls"
)
Expand All @@ -25,6 +28,33 @@ type Supervisor struct {
Storage Storage `mapstructure:"storage"`
}

// Load loads the Supervisor config from a file.
func Load(configFile string) (Supervisor, error) {
if configFile == "" {
return Supervisor{}, errors.New("path to config file cannot be empty")
}

k := koanf.New("::")
if err := k.Load(file.Provider(configFile), yaml.Parser()); err != nil {
return Supervisor{}, err
}

decodeConf := koanf.UnmarshalConf{
Tag: "mapstructure",
}

cfg := DefaultSupervisor()
if err := k.UnmarshalWithConf("", &cfg, decodeConf); err != nil {
return Supervisor{}, fmt.Errorf("cannot parse %s: %w", configFile, err)
}

if err := cfg.Validate(); err != nil {
return Supervisor{}, fmt.Errorf("cannot validate supervisor config %s: %w", configFile, err)
}

return cfg, nil
}

func (s Supervisor) Validate() error {
if err := s.Server.Validate(); err != nil {
return err
Expand Down
41 changes: 11 additions & 30 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/google/uuid"
"github.com/knadh/koanf/maps"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/file"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/knadh/koanf/v2"
"github.com/open-telemetry/opamp-go/client"
Expand Down Expand Up @@ -136,7 +135,7 @@ type Supervisor struct {
opampServerPort int
}

func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, error) {
s := &Supervisor{
logger: logger,
pidProvider: defaultPIDProvider{},
Expand All @@ -153,14 +152,12 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
return nil, err
}

if err := s.loadConfig(configFile); err != nil {
return nil, fmt.Errorf("error loading config: %w", err)
}

if err := s.config.Validate(); err != nil {
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("error validating config: %w", err)
}

s.config = cfg

if err := os.MkdirAll(s.config.Storage.Directory, 0700); err != nil {
return nil, fmt.Errorf("error creating storage dir: %w", err)
}
Expand Down Expand Up @@ -248,28 +245,6 @@ func (s *Supervisor) createTemplates() error {
return nil
}

func (s *Supervisor) loadConfig(configFile string) error {
if configFile == "" {
return errors.New("path to config file cannot be empty")
}

k := koanf.New("::")
if err := k.Load(file.Provider(configFile), yaml.Parser()); err != nil {
return err
}

decodeConf := koanf.UnmarshalConf{
Tag: "mapstructure",
}

s.config = config.DefaultSupervisor()
if err := k.UnmarshalWithConf("", &s.config, decodeConf); err != nil {
return fmt.Errorf("cannot parse %v: %w", configFile, err)
}

return nil
}

// getBootstrapInfo obtains the Collector's agent description by
// starting a Collector with a specific config that only starts
// an OpAMP extension, obtains the agent description, then
Expand Down Expand Up @@ -461,11 +436,17 @@ func (s *Supervisor) startOpAMPClient() error {
func (s *Supervisor) startOpAMPServer() error {
s.opampServer = server.New(newLoggerFromZap(s.logger))

var err error
s.opampServerPort, err = s.findRandomPort()
if err != nil {
return err
}

s.logger.Debug("Starting OpAMP server...")

connected := &atomic.Bool{}

err := s.opampServer.Start(flattenedSettings{
err = s.opampServer.Start(flattenedSettings{
endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort),
onConnectingFunc: func(_ *http.Request) (bool, int) {
// Only allow one agent to be connected the this server at a time.
Expand Down
68 changes: 68 additions & 0 deletions cmd/opampsupervisor/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package supervisor
import (
"bytes"
"context"
"fmt"
"net"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"
Expand All @@ -23,6 +25,72 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
)

func setupSupervisorConfig(t *testing.T) config.Supervisor {
t.Helper()

tmpDir, err := os.MkdirTemp(os.TempDir(), "*")
require.NoError(t, err)

executablePath := filepath.Join(tmpDir, "binary")
err = os.WriteFile(executablePath, []byte{}, 0o600)
require.NoError(t, err)

configuration := `
server:
endpoint: ws://localhost/v1/opamp
tls:
insecure: true
capabilities:
reports_effective_config: true
reports_own_metrics: true
reports_health: true
accepts_remote_config: true
reports_remote_config: true
accepts_restart_command: true
storage:
directory: %s
agent:
executable: %s
`
configuration = fmt.Sprintf(configuration, filepath.Join(tmpDir, "storage"), executablePath)

cfgPath := filepath.Join(tmpDir, "config.yaml")
err = os.WriteFile(cfgPath, []byte(configuration), 0o600)
require.NoError(t, err)

cfg, err := config.Load(cfgPath)
require.NoError(t, err)

t.Cleanup(func() {
require.NoError(t, os.Chmod(tmpDir, 0o700))
require.NoError(t, os.RemoveAll(tmpDir))
})

return cfg
}

func Test_NewSupervisor(t *testing.T) {
cfg := setupSupervisorConfig(t)
supervisor, err := NewSupervisor(zap.L(), cfg)
require.NoError(t, err)
require.NotNil(t, supervisor)
}

func Test_NewSupervisorFailedStorageCreation(t *testing.T) {
cfg := setupSupervisorConfig(t)

dir := filepath.Dir(cfg.Storage.Directory)
require.NoError(t, os.Chmod(dir, 0o500))

supervisor, err := NewSupervisor(zap.L(), cfg)
require.Error(t, err)
require.ErrorContains(t, err, "error creating storage dir")
require.Nil(t, supervisor)
}

func Test_composeEffectiveConfig(t *testing.T) {
acceptsRemoteConfig := true
s := Supervisor{
Expand Down

0 comments on commit 751af79

Please sign in to comment.