Skip to content

Commit

Permalink
[cmd/opampsupervisor]: Don't fail to start if the OpAMP server is una…
Browse files Browse the repository at this point in the history
…vailable (#34159)

**Description:** <Describe what has changed.>
* If the OpAMP server can't be contacted, the supervisor should still be
run
* This PR also fixes #33799 (as it removes the channel that is blocked
on, prevent the reconnect)

This PR supercedes #33275

**Link to tracking Issue:** Fixes #33408, #33799

**Testing:**
* Added an e2e test for the behavior
* Manually tested against BindPlane OP
  • Loading branch information
BinaryFissionGames authored Aug 1, 2024
1 parent a646cba commit cc03841
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 33 deletions.
13 changes: 13 additions & 0 deletions .chloggen/fix_supervisor-dont-require-server-connection.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Start even if the OpAMP server cannot be contacted, and continually retry connecting.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33408, 33799]
198 changes: 197 additions & 1 deletion cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"strings"
"sync/atomic"
Expand All @@ -36,6 +40,7 @@ import (
"github.com/stretchr/testify/require"
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
Expand Down Expand Up @@ -74,10 +79,17 @@ type testingOpAMPServer struct {
addr string
supervisorConnected chan bool
sendToSupervisor func(*protobufs.ServerToAgent)
start func()
shutdown func()
}

func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer {
s := newUnstartedOpAMPServer(t, connectingCallback, callbacks)
s.start()
return s
}

func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer {
var agentConn atomic.Value
var isAgentConnected atomic.Bool
var didShutdown atomic.Bool
Expand Down Expand Up @@ -108,7 +120,7 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca
require.NoError(t, err)
mux := http.NewServeMux()
mux.HandleFunc("/v1/opamp", handler)
httpSrv := httptest.NewServer(mux)
httpSrv := httptest.NewUnstartedServer(mux)

shutdown := func() {
if !didShutdown.Load() {
Expand All @@ -135,6 +147,7 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca
addr: httpSrv.Listener.Addr().String(),
supervisorConnected: connectedChan,
sendToSupervisor: send,
start: httpSrv.Start,
shutdown: shutdown,
}
}
Expand Down Expand Up @@ -238,6 +251,148 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) {
}, 10*time.Second, 500*time.Millisecond, "Log never appeared in output")
}

func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) {
storageDir := t.TempDir()
remoteConfigFilePath := filepath.Join(storageDir, "last_recv_remote_config.dat")

cfg, hash, healthcheckPort := createHealthCheckCollectorConf(t)
remoteConfigProto := &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
}
marshalledRemoteConfig, err := proto.Marshal(remoteConfigProto)
require.NoError(t, err)

require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0600))

connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
})
defer server.shutdown()

s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
})
defer s.Shutdown()

// Verify the collector runs eventually by pinging the healthcheck extension
require.Eventually(t, func() bool {
resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d", healthcheckPort))
if err != nil {
t.Logf("Failed healthcheck: %s", err)
return false
}
require.NoError(t, resp.Body.Close())
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.Logf("Got non-2xx status code: %d", resp.StatusCode)
return false
}
return true
}, 3*time.Second, 100*time.Millisecond)

// Start the server and wait for the supervisor to connect
server.start()

// Verify supervisor connects to server
waitForSupervisorConnection(server.supervisorConnected, true)

require.True(t, connected.Load(), "Supervisor failed to connect")
}

func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {
cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t)

configuredChan := make(chan struct{})
connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
OnMessageFunc: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
lastCfgHash := message.GetRemoteConfigStatus().GetLastRemoteConfigHash()
if bytes.Equal(lastCfgHash, hash) {
close(configuredChan)
}

return &protobufs.ServerToAgent{}
},
})
defer server.shutdown()

// The supervisor is started without a running OpAMP server.
// The supervisor should start successfully, even if the OpAMP server is stopped.
s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
})
defer s.Shutdown()

// Verify the collector is running by checking the metrics endpoint
require.Eventually(t, func() bool {
resp, err := http.DefaultClient.Get("http://localhost:8888/metrics")
if err != nil {
t.Logf("Failed check for prometheus metrics: %s", err)
return false
}
require.NoError(t, resp.Body.Close())
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.Logf("Got non-2xx status code: %d", resp.StatusCode)
return false
}
return true
}, 3*time.Second, 100*time.Millisecond)

// Start the server and wait for the supervisor to connect
server.start()

// Verify supervisor connects to server
waitForSupervisorConnection(server.supervisorConnected, true)

require.True(t, connected.Load(), "Supervisor failed to connect")

// Verify that the collector can run a new config sent to it
server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

select {
case <-configuredChan:
case <-time.After(2 * time.Second):
require.FailNow(t, "timed out waiting for collector to reconfigure")
}

sampleLog := `{"body":"hello, world"}`
n, err := inputFile.WriteString(sampleLog + "\n")
require.NotZero(t, n, "Could not write to input file")
require.NoError(t, err)

require.Eventually(t, func() bool {
logRecord := make([]byte, 1024)

n, err = outputFile.Read(logRecord)
if !errors.Is(err, io.EOF) {
require.NoError(t, err)
}

return n != 0
}, 10*time.Second, 500*time.Millisecond, "Log never appeared in output")

}

func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) {
var healthReport atomic.Value
var agentConfig atomic.Value
Expand Down Expand Up @@ -639,6 +794,29 @@ func createBadCollectorConf(t *testing.T) (*bytes.Buffer, []byte) {
return bytes.NewBuffer(colCfg), h.Sum(nil)
}

func createHealthCheckCollectorConf(t *testing.T) (cfg *bytes.Buffer, hash []byte, remotePort int) {
colCfgTpl, err := os.ReadFile(path.Join("testdata", "collector", "healthcheck_config.yaml"))
require.NoError(t, err)

templ, err := template.New("").Parse(string(colCfgTpl))
require.NoError(t, err)

port, err := findRandomPort()

var confmapBuf bytes.Buffer
err = templ.Execute(
&confmapBuf,
map[string]string{
"HealthCheckEndpoint": fmt.Sprintf("localhost:%d", port),
},
)
require.NoError(t, err)

h := sha256.Sum256(confmapBuf.Bytes())

return &confmapBuf, h[:], port
}

// Wait for the Supervisor to connect to or disconnect from the OpAMP server
func waitForSupervisorConnection(connection chan bool, connected bool) {
select {
Expand Down Expand Up @@ -1012,3 +1190,21 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {

require.Equal(t, newID, uuid.UUID(newRecievedAgentID))
}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

if err != nil {
return 0, err
}

port := l.Addr().(*net.TCPAddr).Port

err = l.Close()

if err != nil {
return 0, err
}

return port, nil
}
7 changes: 7 additions & 0 deletions cmd/opampsupervisor/specification/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ agent:

```
### Operation When OpAMP Server is Unavailable
When the supervisor cannot connect to the OpAMP server, the collector will
be run with the last known configuration, or with a "noop" configuration
if no previous configuration is persisted. The supervisor will continually
attempt to reconnect to the OpAMP server with exponential backoff.
### Executing Collector
The Supervisor starts and stops the Collector process as necessary. When
Expand Down
Loading

0 comments on commit cc03841

Please sign in to comment.