diff --git a/.chloggen/supervisor_restart_command_capability.yaml b/.chloggen/supervisor_restart_command_capability.yaml new file mode 100644 index 000000000000..e984ef510354 --- /dev/null +++ b/.chloggen/supervisor_restart_command_capability.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support AcceptsRestartCommand Capability. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21077] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 274bcc1d4678..0be9c6cfe532 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -474,6 +474,87 @@ func waitForSupervisorConnection(connection chan bool, connected bool) { } } +func TestSupervisorRestartCommand(t *testing.T) { + var healthReport atomic.Value + var agentConfig atomic.Value + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.Health != nil { + healthReport.Store(message.Health) + } + + if message.EffectiveConfig != nil { + config := message.EffectiveConfig.ConfigMap.ConfigMap[""] + if config != nil { + agentConfig.Store(string(config.Body)) + } + } + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "basic", map[string]string{"url": server.addr}) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + + // Send the initial config + cfg, hash, _, _ := createSimplePipelineCollectorConf(t) + + server.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + }, + }) + + require.Eventually(t, func() bool { + cfg, ok := agentConfig.Load().(string) + if ok { + return strings.Contains(cfg, "health_check") + } + return false + }, 5*time.Second, 500*time.Millisecond, "Collector was not started with healthcheck") + + require.Eventually(t, func() bool { + health := healthReport.Load().(*protobufs.ComponentHealth) + + if health != nil { + return health.Healthy && health.LastError == "" + } + + return false + }, 5*time.Second, 500*time.Millisecond, "Collector never became healthy") + + // The health report should be received after the restart + healthReport.Store(&protobufs.ComponentHealth{}) + + server.sendToSupervisor(&protobufs.ServerToAgent{ + Command: &protobufs.ServerToAgentCommand{ + Type: protobufs.CommandType_CommandType_Restart, + }, + }) + + server.sendToSupervisor(&protobufs.ServerToAgent{ + Flags: uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState), + }) + + require.Eventually(t, func() bool { + health := healthReport.Load().(*protobufs.ComponentHealth) + if health != nil { + return health.Healthy && health.LastError == "" + } + return false + }, 10*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart") +} + func TestSupervisorOpAMPConnectionSettings(t *testing.T) { var connectedToNewServer atomic.Bool initialServer := newOpAMPServer( diff --git a/cmd/opampsupervisor/supervisor/commander/commander.go b/cmd/opampsupervisor/supervisor/commander/commander.go index 7457f23693da..3445058ee862 100644 --- a/cmd/opampsupervisor/supervisor/commander/commander.go +++ b/cmd/opampsupervisor/supervisor/commander/commander.go @@ -99,6 +99,7 @@ func (c *Commander) Start(ctx context.Context) error { } func (c *Commander) Restart(ctx context.Context) error { + c.logger.Debug("Restarting agent", zap.String("agent", c.cfg.Executable)) if err := c.Stop(ctx); err != nil { return err } diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index 1948b42a0701..ff020f00db0c 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -19,6 +19,7 @@ type Supervisor struct { // Capabilities is the set of capabilities that the Supervisor supports. type Capabilities struct { AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"` + AcceptsRestartCommand *bool `mapstructure:"accepts_restart_command"` AcceptsOpAMPConnectionSettings *bool `mapstructure:"accepts_opamp_connection_settings"` ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"` ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"` diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index d7e91aedfba9..1aa757a54290 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -107,6 +107,7 @@ type Supervisor struct { agentHasStarted bool agentStartHealthCheckAttempts int + agentRestarting atomic.Bool connectedToOpAMPServer chan struct{} } @@ -345,6 +346,10 @@ func (s *Supervisor) Capabilities() protobufs.AgentCapabilities { supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig } + if c.AcceptsRestartCommand != nil && *c.AcceptsRestartCommand { + supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand + } + if c.AcceptsOpAMPConnectionSettings != nil && *c.AcceptsOpAMPConnectionSettings { supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings } @@ -386,8 +391,7 @@ func (s *Supervisor) startOpAMP() error { OnCommandFunc: func(_ context.Context, command *protobufs.ServerToAgentCommand) error { cmdType := command.GetType() if *cmdType.Enum() == protobufs.CommandType_CommandType_Restart { - // TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21077 - s.logger.Debug("Received restart command") + return s.handleRestartCommand() } return nil }, @@ -699,6 +703,17 @@ func (s *Supervisor) recalcEffectiveConfig() (configChanged bool, err error) { return configChanged, nil } +func (s *Supervisor) handleRestartCommand() error { + s.agentRestarting.Store(true) + defer s.agentRestarting.Store(false) + s.logger.Debug("Received restart command") + err := s.commander.Restart(context.Background()) + if err != nil { + s.logger.Error("Could not restart agent process", zap.Error(err)) + } + return err +} + func (s *Supervisor) startAgent() { err := s.commander.Start(context.Background()) if err != nil { @@ -792,6 +807,11 @@ func (s *Supervisor) runAgentProcess() { s.startAgent() case <-s.commander.Exited(): + // the agent process exit is expected for restart command and will not attempt to restart + if s.agentRestarting.Load() { + continue + } + if s.shuttingDown { return } diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_basic.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_basic.yaml index b5ef4c5db720..7e4b0a08536c 100644 --- a/cmd/opampsupervisor/testdata/supervisor/supervisor_basic.yaml +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_basic.yaml @@ -9,6 +9,7 @@ capabilities: reports_health: true accepts_remote_config: true reports_remote_config: true + accepts_restart_command: true agent: executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}