Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/opampsupervisor] Persist collector remote config & telemetry set… #30807

Merged
merged 18 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/supervisor-persists-remote-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Persist collector remote config & telemetry settings

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21078]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
45 changes: 45 additions & 0 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,48 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
return connectedToNewServer.Load() == true
}, 10*time.Second, 500*time.Millisecond, "Collector did not connect to new OpAMP server")
}

func TestSupervisorTestConfigPersistence(t *testing.T) {
// Create a temporary directory to store the test config file.
tempDir := t.TempDir()

var agentConfig atomic.Value
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
agentConfig.Store(string(config.Body))
}
}
return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "persistence", map[string]string{"url": server.addr, "storage_dir": tempDir})
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

cfg, hash, _, _ := createSimplePipelineCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

require.Eventually(t, func() bool {
// ensure that the config file was written to the storage directory
_, err := os.Stat(path.Join(tempDir, "last_recv_remote_config.yaml"))
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
return err == nil
}, 5*time.Second, 250*time.Millisecond, "Config file was not written to persistent storage directory")
}
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
go.opentelemetry.io/collector/semconv v0.98.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.33.0
)

require (
Expand All @@ -31,6 +32,5 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
6 changes: 6 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ type Supervisor struct {
Server *OpAMPServer
Agent *Agent
Capabilities *Capabilities `mapstructure:"capabilities"`
Storage *Storage `mapstructure:"storage"`
}

type Storage struct {
// Directory is the directory where the Supervisor will store its data.
Directory string `mapstructure:"directory"`
}

// Capabilities is the set of capabilities that the Supervisor supports.
Expand Down
144 changes: 115 additions & 29 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net"
"net/http"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
Expand All @@ -34,6 +35,7 @@ import (
"go.opentelemetry.io/collector/config/configtls"
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/commander"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
Expand All @@ -49,6 +51,9 @@ var (

//go:embed templates/owntelemetry.yaml
ownTelemetryTpl string

lastRecvRemoteConfigFile = "last_recv_remote_config.yaml"
lastRecvOwnMetricsConfigFile = "last_recv_own_metrics_config.yaml"
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
)

// Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient
Expand Down Expand Up @@ -140,13 +145,13 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
return nil, fmt.Errorf("could not get bootstrap info from the Collector: %w", err)
}

port, err := s.findRandomPort()
healthCheckPort, err := s.findRandomPort()

if err != nil {
return nil, fmt.Errorf("could not find port for health check: %w", err)
}

s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", port)
s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", healthCheckPort)

logger.Debug("Supervisor starting",
zap.String("id", s.instanceID.String()))
Expand Down Expand Up @@ -232,9 +237,9 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
var cfg bytes.Buffer

err = s.bootstrapTemplate.Execute(&cfg, map[string]any{
"EndpointPort": port,
"InstanceUid": s.instanceID.String(),
"SupervisorPort": supervisorPort,
"OTLPHTTPEndpointPort": port,
"InstanceUid": s.instanceID.String(),
"SupervisorPort": supervisorPort,
})
if err != nil {
return err
Expand Down Expand Up @@ -549,9 +554,10 @@ func (s *Supervisor) composeExtraLocalConfig() []byte {
}

func (s *Supervisor) loadAgentEffectiveConfig() {
var effectiveConfigBytes []byte
var effectiveConfigBytes, effFromFile, lastRecvRemoteConfig, lastRecvOwnMetricsConfig []byte
var err error

effFromFile, err := os.ReadFile(s.effectiveConfigFilePath)
effFromFile, err = os.ReadFile(s.effectiveConfigFilePath)
if err == nil {
// We have an effective config file.
effectiveConfigBytes = effFromFile
Expand All @@ -561,6 +567,48 @@ func (s *Supervisor) loadAgentEffectiveConfig() {
}

s.effectiveConfig.Store(string(effectiveConfigBytes))

if s.config.Capabilities != nil && s.config.Capabilities.AcceptsRemoteConfig != nil &&
*s.config.Capabilities.AcceptsRemoteConfig &&
s.config.Storage != nil {
// Try to load the last received remote config if it exists.
lastRecvRemoteConfig, err = os.ReadFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile))
if err == nil {
config := &protobufs.AgentRemoteConfig{}
err = proto.Unmarshal(lastRecvRemoteConfig, config)
if err != nil {
s.logger.Error("Cannot parse last received remote config", zap.Error(err))
} else {
s.remoteConfig = config
}
}
} else {
s.logger.Debug("Remote config is not supported")
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
}

if s.config.Capabilities != nil && s.config.Capabilities.ReportsOwnMetrics != nil &&
*s.config.Capabilities.ReportsOwnMetrics &&
s.config.Storage != nil {
// Try to load the last received own metrics config if it exists.
lastRecvOwnMetricsConfig, err = os.ReadFile(filepath.Join(s.config.Storage.Directory, lastRecvOwnMetricsConfigFile))
if err == nil {
set := &protobufs.TelemetryConnectionSettings{}
err = proto.Unmarshal(lastRecvOwnMetricsConfig, set)
if err != nil {
s.logger.Error("Cannot parse last received own metrics config", zap.Error(err))
} else {
s.setupOwnMetrics(context.Background(), set)
}
}
} else {
s.logger.Debug("Own metrics is not supported")
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
}

_, err = s.recalcEffectiveConfig()
if err != nil {
s.logger.Error("Error composing effective config. Ignoring received config", zap.Error(err))
return
}
}

// createEffectiveConfigMsg create an EffectiveConfig with the content of the
Expand Down Expand Up @@ -626,39 +674,45 @@ func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.Tele
// 2) the own metrics config section
// 3) the local override config that is hard-coded in the Supervisor.
func (s *Supervisor) composeEffectiveConfig(config *protobufs.AgentRemoteConfig) (configChanged bool, err error) {
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved

var k = koanf.New(".")

// Begin with empty config. We will merge received configs on top of it.
if err = k.Load(rawbytes.Provider([]byte{}), yaml.Parser()); err != nil {
return false, err
}

// Sort to make sure the order of merging is stable.
var names []string
for name := range config.Config.ConfigMap {
if name == "" {
// skip instance config
continue
if config != nil && config.Config != nil {
// Sort to make sure the order of merging is stable.
var names []string
for name := range config.Config.ConfigMap {
if name == "" {
// skip instance config
continue
}
names = append(names, name)
}
names = append(names, name)
}

sort.Strings(names)
sort.Strings(names)

// Append instance config as the last item.
names = append(names, "")
// Append instance config as the last item.
names = append(names, "")

// Merge received configs.
for _, name := range names {
item := config.Config.ConfigMap[name]
var k2 = koanf.New(".")
err = k2.Load(rawbytes.Provider(item.Body), yaml.Parser())
if err != nil {
return false, fmt.Errorf("cannot parse config named %s: %w", name, err)
}
err = k.Merge(k2)
if err != nil {
return false, fmt.Errorf("cannot merge config named %s: %w", name, err)
// Merge received configs.
for _, name := range names {
item := config.Config.ConfigMap[name]
if item == nil {
continue
}
var k2 = koanf.New(".")
err = k2.Load(rawbytes.Provider(item.Body), yaml.Parser())
if err != nil {
return false, fmt.Errorf("cannot parse config named %s: %w", name, err)
}
err = k.Merge(k2)
if err != nil {
return false, fmt.Errorf("cannot merge config named %s: %w", name, err)
}
}
}

Expand Down Expand Up @@ -891,9 +945,38 @@ func (s *Supervisor) Shutdown() {
s.supervisorWG.Wait()
}

func (s *Supervisor) saveLastReceivedConfig(config *protobufs.AgentRemoteConfig) error {
if s.config.Storage == nil {
return nil
}

cfg, err := proto.Marshal(config)
if err != nil {
return err
}

return os.WriteFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile), cfg, 0600)
}

func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.TelemetryConnectionSettings, filePath string) error {
if s.config.Storage == nil {
return nil
}

cfg, err := proto.Marshal(set)
if err != nil {
return err
}

return os.WriteFile(filepath.Join(s.config.Storage.Directory, filePath), cfg, 0600)
}

func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) {
configChanged := false
if msg.RemoteConfig != nil {
if err := s.saveLastReceivedConfig(msg.RemoteConfig); err != nil {
s.logger.Error("Could not save last received remote config", zap.Error(err))
}
s.remoteConfig = msg.RemoteConfig
s.logger.Debug("Received remote config from server", zap.String("hash", fmt.Sprintf("%x", s.remoteConfig.ConfigHash)))

Expand All @@ -920,6 +1003,9 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) {
}

if msg.OwnMetricsConnSettings != nil {
if err := s.saveLastReceivedOwnTelemetrySettings(msg.OwnMetricsConnSettings, lastRecvOwnMetricsConfigFile); err != nil {
s.logger.Error("Could not save last received own telemetry settings", zap.Error(err))
}
configChanged = s.setupOwnMetrics(ctx, msg.OwnMetricsConnSettings) || configChanged
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/opampsupervisor/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
)

func Test_composeEffectiveConfig(t *testing.T) {
acceptsRemoteConfig := true
s := Supervisor{
logger: zap.NewNop(),
config: config.Supervisor{Capabilities: &config.Capabilities{AcceptsRemoteConfig: &acceptsRemoteConfig}},
hasNewConfig: make(chan struct{}, 1),
effectiveConfigFilePath: "effective.yaml",
agentConfigOwnMetricsSection: &atomic.Value{},
Expand Down Expand Up @@ -55,6 +59,6 @@ func Test_composeEffectiveConfig(t *testing.T) {
require.NoError(t, err)
expectedConfig = bytes.ReplaceAll(expectedConfig, []byte("\r\n"), []byte("\n"))

require.True(t, configChanged)
require.False(t, configChanged)
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
require.Equal(t, string(expectedConfig), s.effectiveConfig.Load().(string))
}
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/supervisor/templates/bootstrap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ receivers:
otlp:
protocols:
http:
endpoint: "localhost:{{.EndpointPort}}"
endpoint: "localhost:{{.OTLPHTTPEndpointPort}}"
exporters:
debug:
verbosity: basic
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
server:
endpoint: ws://{{.url}}/v1/opamp
tls:
insecure: true

capabilities:
reports_effective_config: true
reports_own_metrics: true
reports_health: true
accepts_remote_config: true
reports_remote_config: true

storage:
directory: {{.storage_dir}}

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
Loading