diff --git a/.chloggen/fix_supervisor-dont-require-server-connection.yaml b/.chloggen/fix_supervisor-dont-require-server-connection.yaml new file mode 100644 index 000000000000..930693f63ac3 --- /dev/null +++ b/.chloggen/fix_supervisor-dont-require-server-connection.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Start even if the OpAMP server cannot be contacted, and continually retry connecting. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33408, 33799] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index b5faabe7183a..39635add3e82 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -9,13 +9,17 @@ import ( "bytes" "context" "crypto/sha256" + "errors" + "fmt" "io" "log" + "net" "net/http" "net/http/httptest" "os" "os/exec" "path" + "path/filepath" "runtime" "strings" "sync/atomic" @@ -36,6 +40,7 @@ import ( "github.com/stretchr/testify/require" semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config" @@ -74,10 +79,17 @@ type testingOpAMPServer struct { addr string supervisorConnected chan bool sendToSupervisor func(*protobufs.ServerToAgent) + start func() shutdown func() } func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer { + s := newUnstartedOpAMPServer(t, connectingCallback, callbacks) + s.start() + return s +} + +func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer { var agentConn atomic.Value var isAgentConnected atomic.Bool var didShutdown atomic.Bool @@ -108,7 +120,7 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca require.NoError(t, err) mux := http.NewServeMux() mux.HandleFunc("/v1/opamp", handler) - httpSrv := httptest.NewServer(mux) + httpSrv := httptest.NewUnstartedServer(mux) shutdown := func() { if !didShutdown.Load() { @@ -135,6 +147,7 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca addr: httpSrv.Listener.Addr().String(), supervisorConnected: connectedChan, sendToSupervisor: send, + start: httpSrv.Start, shutdown: shutdown, } } @@ -238,6 +251,148 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) { }, 10*time.Second, 500*time.Millisecond, "Log never appeared in output") } +func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) { + storageDir := t.TempDir() + remoteConfigFilePath := filepath.Join(storageDir, "last_recv_remote_config.dat") + + cfg, hash, healthcheckPort := createHealthCheckCollectorConf(t) + remoteConfigProto := &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + } + marshalledRemoteConfig, err := proto.Marshal(remoteConfigProto) + require.NoError(t, err) + + require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0600)) + + connected := atomic.Bool{} + server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ + OnConnectedFunc: func(ctx context.Context, conn types.Connection) { + connected.Store(true) + }, + }) + defer server.shutdown() + + s := newSupervisor(t, "basic", map[string]string{ + "url": server.addr, + "storage_dir": storageDir, + }) + defer s.Shutdown() + + // Verify the collector runs eventually by pinging the healthcheck extension + require.Eventually(t, func() bool { + resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d", healthcheckPort)) + if err != nil { + t.Logf("Failed healthcheck: %s", err) + return false + } + require.NoError(t, resp.Body.Close()) + if resp.StatusCode >= 300 || resp.StatusCode < 200 { + t.Logf("Got non-2xx status code: %d", resp.StatusCode) + return false + } + return true + }, 3*time.Second, 100*time.Millisecond) + + // Start the server and wait for the supervisor to connect + server.start() + + // Verify supervisor connects to server + waitForSupervisorConnection(server.supervisorConnected, true) + + require.True(t, connected.Load(), "Supervisor failed to connect") +} + +func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) { + cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t) + + configuredChan := make(chan struct{}) + connected := atomic.Bool{} + server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ + OnConnectedFunc: func(ctx context.Context, conn types.Connection) { + connected.Store(true) + }, + OnMessageFunc: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + lastCfgHash := message.GetRemoteConfigStatus().GetLastRemoteConfigHash() + if bytes.Equal(lastCfgHash, hash) { + close(configuredChan) + } + + return &protobufs.ServerToAgent{} + }, + }) + defer server.shutdown() + + // The supervisor is started without a running OpAMP server. + // The supervisor should start successfully, even if the OpAMP server is stopped. + s := newSupervisor(t, "basic", map[string]string{ + "url": server.addr, + }) + defer s.Shutdown() + + // Verify the collector is running by checking the metrics endpoint + require.Eventually(t, func() bool { + resp, err := http.DefaultClient.Get("http://localhost:8888/metrics") + if err != nil { + t.Logf("Failed check for prometheus metrics: %s", err) + return false + } + require.NoError(t, resp.Body.Close()) + if resp.StatusCode >= 300 || resp.StatusCode < 200 { + t.Logf("Got non-2xx status code: %d", resp.StatusCode) + return false + } + return true + }, 3*time.Second, 100*time.Millisecond) + + // Start the server and wait for the supervisor to connect + server.start() + + // Verify supervisor connects to server + waitForSupervisorConnection(server.supervisorConnected, true) + + require.True(t, connected.Load(), "Supervisor failed to connect") + + // Verify that the collector can run a new config sent to it + server.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + }, + }) + + select { + case <-configuredChan: + case <-time.After(2 * time.Second): + require.FailNow(t, "timed out waiting for collector to reconfigure") + } + + sampleLog := `{"body":"hello, world"}` + n, err := inputFile.WriteString(sampleLog + "\n") + require.NotZero(t, n, "Could not write to input file") + require.NoError(t, err) + + require.Eventually(t, func() bool { + logRecord := make([]byte, 1024) + + n, err = outputFile.Read(logRecord) + if !errors.Is(err, io.EOF) { + require.NoError(t, err) + } + + return n != 0 + }, 10*time.Second, 500*time.Millisecond, "Log never appeared in output") + +} + func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) { var healthReport atomic.Value var agentConfig atomic.Value @@ -639,6 +794,29 @@ func createBadCollectorConf(t *testing.T) (*bytes.Buffer, []byte) { return bytes.NewBuffer(colCfg), h.Sum(nil) } +func createHealthCheckCollectorConf(t *testing.T) (cfg *bytes.Buffer, hash []byte, remotePort int) { + colCfgTpl, err := os.ReadFile(path.Join("testdata", "collector", "healthcheck_config.yaml")) + require.NoError(t, err) + + templ, err := template.New("").Parse(string(colCfgTpl)) + require.NoError(t, err) + + port, err := findRandomPort() + + var confmapBuf bytes.Buffer + err = templ.Execute( + &confmapBuf, + map[string]string{ + "HealthCheckEndpoint": fmt.Sprintf("localhost:%d", port), + }, + ) + require.NoError(t, err) + + h := sha256.Sum256(confmapBuf.Bytes()) + + return &confmapBuf, h[:], port +} + // Wait for the Supervisor to connect to or disconnect from the OpAMP server func waitForSupervisorConnection(connection chan bool, connected bool) { select { @@ -1012,3 +1190,21 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) { require.Equal(t, newID, uuid.UUID(newRecievedAgentID)) } + +func findRandomPort() (int, error) { + l, err := net.Listen("tcp", "localhost:0") + + if err != nil { + return 0, err + } + + port := l.Addr().(*net.TCPAddr).Port + + err = l.Close() + + if err != nil { + return 0, err + } + + return port, nil +} diff --git a/cmd/opampsupervisor/specification/README.md b/cmd/opampsupervisor/specification/README.md index 411f7ce3de10..10df4925e0ca 100644 --- a/cmd/opampsupervisor/specification/README.md +++ b/cmd/opampsupervisor/specification/README.md @@ -160,6 +160,13 @@ agent: ``` +### Operation When OpAMP Server is Unavailable + +When the supervisor cannot connect to the OpAMP server, the collector will +be run with the last known configuration, or with a "noop" configuration +if no previous configuration is persisted. The supervisor will continually +attempt to reconnect to the OpAMP server with exponential backoff. + ### Executing Collector The Supervisor starts and stops the Collector process as necessary. When diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 3ef418a68f14..0f6cbcf6b740 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -43,8 +43,8 @@ import ( ) var ( - //go:embed templates/bootstrap_pipeline.yaml - bootstrapConfTpl string + //go:embed templates/nooppipeline.yaml + noopPipelineTpl string //go:embed templates/extraconfig.yaml extraConfigTpl string @@ -89,7 +89,7 @@ type Supervisor struct { // Supervisor's persistent state persistentState *persistentState - bootstrapTemplate *template.Template + noopPipelineTemplate *template.Template opampextensionTemplate *template.Template extraConfigTemplate *template.Template ownTelemetryTemplate *template.Template @@ -131,8 +131,6 @@ type Supervisor struct { agentStartHealthCheckAttempts int agentRestarting atomic.Bool - connectedToOpAMPServer chan struct{} - // The OpAMP server to communicate with the Collector's OpAMP extension opampServer server.OpAMPServer opampServerPort int @@ -145,7 +143,6 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { hasNewConfig: make(chan struct{}, 1), agentConfigOwnMetricsSection: &atomic.Value{}, mergedConfig: &atomic.Value{}, - connectedToOpAMPServer: make(chan struct{}), effectiveConfig: &atomic.Value{}, agentDescription: &atomic.Value{}, doneChan: make(chan struct{}), @@ -189,7 +186,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { logger.Debug("Supervisor starting", zap.String("id", s.persistentState.InstanceID.String())) - err = s.loadInitialMergedConfig() + err = s.loadAndWriteInitialMergedConfig() if err != nil { return nil, fmt.Errorf("failed loading initial config: %w", err) } @@ -198,10 +195,6 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return nil, fmt.Errorf("cannot start OpAMP client: %w", err) } - if connErr := s.waitForOpAMPConnection(); connErr != nil { - return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", connErr) - } - s.commander, err = commander.NewCommander( s.logger, s.config.Agent, @@ -231,7 +224,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { func (s *Supervisor) createTemplates() error { var err error - if s.bootstrapTemplate, err = template.New("bootstrap").Parse(bootstrapConfTpl); err != nil { + if s.noopPipelineTemplate, err = template.New("nooppipeline").Parse(noopPipelineTpl); err != nil { return err } if s.extraConfigTemplate, err = template.New("extraconfig").Parse(extraConfigTpl); err != nil { @@ -280,7 +273,7 @@ func (s *Supervisor) getBootstrapInfo() (err error) { return err } - bootstrapConfig, err := s.composeBootstrapConfig() + bootstrapConfig, err := s.composeNoopConfig() if err != nil { return err } @@ -403,7 +396,6 @@ func (s *Supervisor) startOpAMPClient() error { InstanceUid: types.InstanceUid(s.persistentState.InstanceID), Callbacks: types.CallbacksStruct{ OnConnectFunc: func(_ context.Context) { - s.connectedToOpAMPServer <- struct{}{} s.logger.Debug("Connected to the server.") }, OnConnectFailedFunc: func(_ context.Context, err error) { @@ -679,24 +671,12 @@ func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *prot return err } } - return s.waitForOpAMPConnection() -} - -func (s *Supervisor) waitForOpAMPConnection() error { - // wait for the OpAMP client to connect to the server or timeout - select { - case <-s.connectedToOpAMPServer: - return nil - case <-time.After(10 * time.Second): - return errors.New("timed out waiting for the server to connect") - } + return nil } -func (s *Supervisor) composeBootstrapConfig() ([]byte, error) { - var k = koanf.New("::") - +func (s *Supervisor) composeNoopPipeline() ([]byte, error) { var cfg bytes.Buffer - err := s.bootstrapTemplate.Execute(&cfg, map[string]any{ + err := s.noopPipelineTemplate.Execute(&cfg, map[string]any{ "InstanceUid": s.persistentState.InstanceID.String(), "SupervisorPort": s.opampServerPort, }) @@ -704,7 +684,17 @@ func (s *Supervisor) composeBootstrapConfig() ([]byte, error) { return nil, err } - if err = k.Load(rawbytes.Provider(cfg.Bytes()), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil { + return cfg.Bytes(), nil +} + +func (s *Supervisor) composeNoopConfig() ([]byte, error) { + var k = koanf.New("::") + + cfg, err := s.composeNoopPipeline() + if err != nil { + return nil, err + } + if err = k.Load(rawbytes.Provider(cfg), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil { return nil, err } if err = k.Load(rawbytes.Provider(s.composeOpAMPExtensionConfig()), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil { @@ -766,7 +756,7 @@ func (s *Supervisor) composeOpAMPExtensionConfig() []byte { return cfg.Bytes() } -func (s *Supervisor) loadInitialMergedConfig() error { +func (s *Supervisor) loadAndWriteInitialMergedConfig() error { var lastRecvRemoteConfig, lastRecvOwnMetricsConfig []byte var err error @@ -809,6 +799,12 @@ func (s *Supervisor) loadInitialMergedConfig() error { return fmt.Errorf("could not compose initial merged config: %w", err) } + // write the initial merged config to disk + cfg := s.mergedConfig.Load().(string) + if err := os.WriteFile(agentConfigFilePath, []byte(cfg), 0600); err != nil { + s.logger.Error("Failed to write agent config.", zap.Error(err)) + } + return nil } @@ -913,6 +909,17 @@ func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (c return false, fmt.Errorf("cannot merge config named %s: %w", name, err) } } + } else { + // Add noop pipeline + var noopConfig []byte + noopConfig, err = s.composeNoopPipeline() + if err != nil { + return false, fmt.Errorf("could not compose noop pipeline: %w", err) + } + + if err = k.Load(rawbytes.Provider(noopConfig), yaml.Parser(), koanf.WithMergeFunc(configMergeFunc)); err != nil { + return false, fmt.Errorf("could not merge noop pipeline: %w", err) + } } // Merge own metrics config. diff --git a/cmd/opampsupervisor/supervisor/supervisor_test.go b/cmd/opampsupervisor/supervisor/supervisor_test.go index 1af3653e05cc..df946175a003 100644 --- a/cmd/opampsupervisor/supervisor/supervisor_test.go +++ b/cmd/opampsupervisor/supervisor/supervisor_test.go @@ -69,7 +69,7 @@ service: exporters: [file]` require.NoError(t, s.createTemplates()) - require.NoError(t, s.loadInitialMergedConfig()) + require.NoError(t, s.loadAndWriteInitialMergedConfig()) configChanged, err := s.composeMergedConfig(&protobufs.AgentRemoteConfig{ Config: &protobufs.AgentConfigMap{ diff --git a/cmd/opampsupervisor/supervisor/templates/bootstrap_pipeline.yaml b/cmd/opampsupervisor/supervisor/templates/nooppipeline.yaml similarity index 100% rename from cmd/opampsupervisor/supervisor/templates/bootstrap_pipeline.yaml rename to cmd/opampsupervisor/supervisor/templates/nooppipeline.yaml diff --git a/cmd/opampsupervisor/testdata/collector/healthcheck_config.yaml b/cmd/opampsupervisor/testdata/collector/healthcheck_config.yaml new file mode 100644 index 000000000000..e6baee1be269 --- /dev/null +++ b/cmd/opampsupervisor/testdata/collector/healthcheck_config.yaml @@ -0,0 +1,16 @@ +receivers: + nop: + +exporters: + nop: + +extensions: + health_check/livenesscheck: + endpoint: "{{ .HealthCheckEndpoint }}" + +service: + extensions: [health_check/livenesscheck] + pipelines: + logs: + receivers: [nop] + exporters: [nop]