Skip to content

Commit

Permalink
[cmd/opampsupervisor] Support AcceptsRestartCommand Capability (ope…
Browse files Browse the repository at this point in the history
…n-telemetry#30694)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

Add restart command support to opamp supervisor capabilities.

Fixes
open-telemetry#21077

**Link to tracking Issue:**
open-telemetry#21077

**Testing:** Added integration test

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com>
  • Loading branch information
srikanthccv and evan-bradley authored Apr 23, 2024
1 parent 9b58d54 commit 5975f22
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 2 deletions.
27 changes: 27 additions & 0 deletions .chloggen/supervisor_restart_command_capability.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support AcceptsRestartCommand Capability.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21077]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
81 changes: 81 additions & 0 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,87 @@ func waitForSupervisorConnection(connection chan bool, connected bool) {
}
}

func TestSupervisorRestartCommand(t *testing.T) {
var healthReport atomic.Value
var agentConfig atomic.Value
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.Health != nil {
healthReport.Store(message.Health)
}

if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
agentConfig.Store(string(config.Body))
}
}
return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "basic", map[string]string{"url": server.addr})
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

// Send the initial config
cfg, hash, _, _ := createSimplePipelineCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

require.Eventually(t, func() bool {
cfg, ok := agentConfig.Load().(string)
if ok {
return strings.Contains(cfg, "health_check")
}
return false
}, 5*time.Second, 500*time.Millisecond, "Collector was not started with healthcheck")

require.Eventually(t, func() bool {
health := healthReport.Load().(*protobufs.ComponentHealth)

if health != nil {
return health.Healthy && health.LastError == ""
}

return false
}, 5*time.Second, 500*time.Millisecond, "Collector never became healthy")

// The health report should be received after the restart
healthReport.Store(&protobufs.ComponentHealth{})

server.sendToSupervisor(&protobufs.ServerToAgent{
Command: &protobufs.ServerToAgentCommand{
Type: protobufs.CommandType_CommandType_Restart,
},
})

server.sendToSupervisor(&protobufs.ServerToAgent{
Flags: uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState),
})

require.Eventually(t, func() bool {
health := healthReport.Load().(*protobufs.ComponentHealth)
if health != nil {
return health.Healthy && health.LastError == ""
}
return false
}, 10*time.Second, 250*time.Millisecond, "Collector never reported healthy after restart")
}

func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
var connectedToNewServer atomic.Bool
initialServer := newOpAMPServer(
Expand Down
1 change: 1 addition & 0 deletions cmd/opampsupervisor/supervisor/commander/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (c *Commander) Start(ctx context.Context) error {
}

func (c *Commander) Restart(ctx context.Context) error {
c.logger.Debug("Restarting agent", zap.String("agent", c.cfg.Executable))
if err := c.Stop(ctx); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Supervisor struct {
// Capabilities is the set of capabilities that the Supervisor supports.
type Capabilities struct {
AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"`
AcceptsRestartCommand *bool `mapstructure:"accepts_restart_command"`
AcceptsOpAMPConnectionSettings *bool `mapstructure:"accepts_opamp_connection_settings"`
ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"`
ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"`
Expand Down
24 changes: 22 additions & 2 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type Supervisor struct {

agentHasStarted bool
agentStartHealthCheckAttempts int
agentRestarting atomic.Bool

connectedToOpAMPServer chan struct{}
}
Expand Down Expand Up @@ -345,6 +346,10 @@ func (s *Supervisor) Capabilities() protobufs.AgentCapabilities {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig
}

if c.AcceptsRestartCommand != nil && *c.AcceptsRestartCommand {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand
}

if c.AcceptsOpAMPConnectionSettings != nil && *c.AcceptsOpAMPConnectionSettings {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings
}
Expand Down Expand Up @@ -386,8 +391,7 @@ func (s *Supervisor) startOpAMP() error {
OnCommandFunc: func(_ context.Context, command *protobufs.ServerToAgentCommand) error {
cmdType := command.GetType()
if *cmdType.Enum() == protobufs.CommandType_CommandType_Restart {
// TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21077
s.logger.Debug("Received restart command")
return s.handleRestartCommand()
}
return nil
},
Expand Down Expand Up @@ -699,6 +703,17 @@ func (s *Supervisor) recalcEffectiveConfig() (configChanged bool, err error) {
return configChanged, nil
}

func (s *Supervisor) handleRestartCommand() error {
s.agentRestarting.Store(true)
defer s.agentRestarting.Store(false)
s.logger.Debug("Received restart command")
err := s.commander.Restart(context.Background())
if err != nil {
s.logger.Error("Could not restart agent process", zap.Error(err))
}
return err
}

func (s *Supervisor) startAgent() {
err := s.commander.Start(context.Background())
if err != nil {
Expand Down Expand Up @@ -792,6 +807,11 @@ func (s *Supervisor) runAgentProcess() {
s.startAgent()

case <-s.commander.Exited():
// the agent process exit is expected for restart command and will not attempt to restart
if s.agentRestarting.Load() {
continue
}

if s.shuttingDown {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ capabilities:
reports_health: true
accepts_remote_config: true
reports_remote_config: true
accepts_restart_command: true

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}

0 comments on commit 5975f22

Please sign in to comment.