Skip to content

Commit

Permalink
Revert "[cmd/opampsupervisor]: Persist the instance ID between restar…
Browse files Browse the repository at this point in the history
…ts (#32618)"

This reverts commit 125ff49.
  • Loading branch information
evan-bradley authored May 16, 2024
1 parent cd4b21b commit 5e6d25c
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 432 deletions.
13 changes: 0 additions & 13 deletions .chloggen/feat_opamp-supervisor-persist-instance-id.yaml

This file was deleted.

11 changes: 0 additions & 11 deletions cmd/opampsupervisor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,6 @@ The supervisor is currently undergoing heavy development and is not ready for an

4. The supervisor should connect to the OpAMP server and start a Collector instance.

## Persistent data storage
The supervisor persists some data to disk in order to mantain state between restarts. The directory where this data is stored may be specified via the supervisor configuration:
```yaml
storage:
directory: "/path/to/storage/dir"
```

By default, the supervisor will use `/var/lib/otelcol/supervisor` on posix systems, and `%ProgramData%/Otelcol/Supervisor` on Windows.

This directory will be created on supervisor startup if it does not exist.

## Status

The OpenTelemetry OpAMP Supervisor is intended to be the reference
Expand Down
158 changes: 3 additions & 155 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,9 @@ func getSupervisorConfig(t *testing.T, configType string, extraConfigData map[st
extension = ".exe"
}
configData := map[string]string{
"goos": runtime.GOOS,
"goarch": runtime.GOARCH,
"extension": extension,
"storage_dir": t.TempDir(),
"goos": runtime.GOOS,
"goarch": runtime.GOARCH,
"extension": extension,
}

for key, val := range extraConfigData {
Expand Down Expand Up @@ -782,154 +781,3 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
}, 10*time.Second, 500*time.Millisecond, "Collector was not started with the last received remote config")

}

func TestSupervisorPersistsInstanceID(t *testing.T) {
// Tests shutting down and starting up a new supervisor will
// persist and re-use the same instance ID.
storageDir := t.TempDir()

agentIDChan := make(chan string, 1)
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {

select {
case agentIDChan <- message.InstanceUid:
default:
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
})

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")

var firstAgentID string
select {
case firstAgentID = <-agentIDChan:
case <-time.After(1 * time.Second):
t.Fatalf("failed to get first agent ID")
}

t.Logf("Got agent ID %s, shutting down supervisor", firstAgentID)

s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, false)

t.Logf("Supervisor disconnected")

// Drain agent ID channel so we get a fresh ID from the new supervisor
select {
case <-agentIDChan:
default:
}

s = newSupervisor(t, "basic", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
})
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")

var secondAgentID string
select {
case secondAgentID = <-agentIDChan:
case <-time.After(1 * time.Second):
t.Fatalf("failed to get second agent ID")
}

require.Equal(t, firstAgentID, secondAgentID)
}

func TestSupervisorPersistsNewInstanceID(t *testing.T) {
// Tests that an agent ID that is given from the server to the agent in an AgentIdentification message
// is properly persisted.
storageDir := t.TempDir()

newID := "01HW3GS9NWD840C5C2BZS3KYPW"

agentIDChan := make(chan string, 1)
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {

select {
case agentIDChan <- message.InstanceUid:
default:
}

if message.InstanceUid != newID {
return &protobufs.ServerToAgent{
InstanceUid: message.InstanceUid,
AgentIdentification: &protobufs.AgentIdentification{
NewInstanceUid: newID,
},
}
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
})

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")

for id := range agentIDChan {
if id == newID {
t.Logf("Agent ID was changed to new ID")
break
}
}

s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, false)

t.Logf("Supervisor disconnected")

// Drain agent ID channel so we get a fresh ID from the new supervisor
select {
case <-agentIDChan:
default:
}

s = newSupervisor(t, "basic", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
})
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")

var newRecievedAgentID string
select {
case newRecievedAgentID = <-agentIDChan:
case <-time.After(1 * time.Second):
t.Fatalf("failed to get second agent ID")
}

require.Equal(t, newID, newRecievedAgentID)
}
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.34.1
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand All @@ -33,4 +32,5 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
28 changes: 1 addition & 27 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ package config

import (
"net/http"
"os"
"path/filepath"
"runtime"

"go.opentelemetry.io/collector/config/configtls"
)
Expand All @@ -17,37 +14,14 @@ type Supervisor struct {
Server *OpAMPServer
Agent *Agent
Capabilities *Capabilities `mapstructure:"capabilities"`
Storage Storage `mapstructure:"storage"`
Storage *Storage `mapstructure:"storage"`
}

type Storage struct {
// Directory is the directory where the Supervisor will store its data.
Directory string `mapstructure:"directory"`
}

// DirectoryOrDefault returns the configured storage directory if it was configured,
// otherwise it returns the system default.
func (s Storage) DirectoryOrDefault() string {
if s.Directory == "" {
switch runtime.GOOS {
case "windows":
// Windows default is "%ProgramData%\Otelcol\Supervisor"
// If the ProgramData environment variable is not set,
// it falls back to C:\ProgramData
programDataDir := os.Getenv("ProgramData")
if programDataDir == "" {
programDataDir = `C:\ProgramData`
}
return filepath.Join(programDataDir, "Otelcol", "Supervisor")
default:
// Default for non-windows systems
return "/var/lib/otelcol/supervisor"
}
}

return s.Directory
}

// Capabilities is the set of capabilities that the Supervisor supports.
type Capabilities struct {
AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"`
Expand Down
92 changes: 0 additions & 92 deletions cmd/opampsupervisor/supervisor/persistence.go

This file was deleted.

Loading

0 comments on commit 5e6d25c

Please sign in to comment.