Skip to content

Commit

Permalink
[cmd/opampsupervisor] Fix ServerToAgent message field handling (open-…
Browse files Browse the repository at this point in the history
…telemetry#34349)

**Description:**

Follow-up to
open-telemetry#33576.

Boolean short-circuiting was causing the `onMessage` handler to not
process the whole `ServerToAgent` message if it included multiple
fields. I noticed this was causing the Collector to fail to start when
using the opamp-go demo server; the own metrics section is required for
the Collector to start since it provides the only pipelines by default.

I included a new unit test that verifies everything looks as we would
expect when getting a message like what would be sent by the example
server.

---------

Co-authored-by: Evan Bradley <evan-bradley@users.noreply.github.com>
  • Loading branch information
evan-bradley and evan-bradley authored Sep 11, 2024
1 parent 413477e commit cb7f220
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 7 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix-onmessage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Ensure the Supervisor processes all fields in a ServerToAgent message.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34349]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
22 changes: 15 additions & 7 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,16 +1190,17 @@ func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.Telemet

func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) {
configChanged := false
if msg.RemoteConfig != nil {
configChanged = configChanged || s.processRemoteConfigMessage(msg.RemoteConfig)

if msg.AgentIdentification != nil {
configChanged = s.processAgentIdentificationMessage(msg.AgentIdentification) || configChanged
}

if msg.OwnMetricsConnSettings != nil {
configChanged = configChanged || s.processOwnMetricsConnSettingsMessage(ctx, msg.OwnMetricsConnSettings)
if msg.RemoteConfig != nil {
configChanged = s.processRemoteConfigMessage(msg.RemoteConfig) || configChanged
}

if msg.AgentIdentification != nil {
configChanged = configChanged || s.processAgentIdentificationMessage(msg.AgentIdentification)
if msg.OwnMetricsConnSettings != nil {
configChanged = s.processOwnMetricsConnSettingsMessage(ctx, msg.OwnMetricsConnSettings) || configChanged
}

// Update the agent config if any messages have touched the config
Expand Down Expand Up @@ -1309,7 +1310,14 @@ func (s *Supervisor) processAgentIdentificationMessage(msg *protobufs.AgentIdent
s.logger.Error("Failed to send agent description to OpAMP server")
}

return true
// Need to recalculate the Agent config so that the new agent identification is included in it.
configChanged, err := s.composeMergedConfig(s.remoteConfig)
if err != nil {
s.logger.Error("Error composing merged config with new instance ID", zap.Error(err))
return false
}

return configChanged
}

func (s *Supervisor) persistentStateFilePath() string {
Expand Down
60 changes: 60 additions & 0 deletions cmd/opampsupervisor/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ func Test_onMessage(t *testing.T) {
persistentState: &persistentState{InstanceID: initialID},
agentDescription: agentDesc,
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentHealthCheckEndpoint: "localhost:8000",
opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())),
}
require.NoError(t, s.createTemplates())

s.onMessage(context.Background(), &types.MessageData{
AgentIdentification: &protobufs.AgentIdentification{
Expand All @@ -131,9 +133,11 @@ func Test_onMessage(t *testing.T) {
persistentState: &persistentState{InstanceID: testUUID},
agentDescription: agentDesc,
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentHealthCheckEndpoint: "localhost:8000",
}
require.NoError(t, s.createTemplates())

s.onMessage(context.Background(), &types.MessageData{
AgentIdentification: &protobufs.AgentIdentification{
Expand Down Expand Up @@ -175,6 +179,7 @@ func Test_onMessage(t *testing.T) {
hasNewConfig: make(chan struct{}, 1),
persistentState: &persistentState{InstanceID: testUUID},
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentConn: agentConnAtomic,
agentHealthCheckEndpoint: "localhost:8000",
Expand Down Expand Up @@ -231,6 +236,61 @@ func Test_onMessage(t *testing.T) {
require.True(t, gotMessage, "Message was not sent to agent")
})

t.Run("Processes all ServerToAgent fields", func(t *testing.T) {
agentDesc := &atomic.Value{}
agentDesc.Store(&protobufs.AgentDescription{
NonIdentifyingAttributes: []*protobufs.KeyValue{
{
Key: "runtime.type",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{
StringValue: "test",
},
},
},
},
})
initialID := uuid.MustParse("018fee23-4a51-7303-a441-73faed7d9deb")
newID := uuid.MustParse("018fef3f-14a8-73ef-b63e-3b96b146ea38")
s := Supervisor{
logger: zap.NewNop(),
pidProvider: defaultPIDProvider{},
config: config.Supervisor{},
hasNewConfig: make(chan struct{}, 1),
persistentState: &persistentState{InstanceID: initialID},
agentDescription: agentDesc,
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentHealthCheckEndpoint: "localhost:8000",
opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())),
}
require.NoError(t, s.createTemplates())

s.onMessage(context.Background(), &types.MessageData{
AgentIdentification: &protobufs.AgentIdentification{
NewInstanceUid: newID[:],
},
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {
Body: []byte(""),
},
},
},
},
OwnMetricsConnSettings: &protobufs.TelemetryConnectionSettings{
DestinationEndpoint: "http://localhost:4318",
},
})

require.Equal(t, newID, s.persistentState.InstanceID)
t.Log(s.mergedConfig.Load())
require.Contains(t, s.mergedConfig.Load(), "prometheus/own_metrics")
require.Contains(t, s.mergedConfig.Load(), newID.String())
require.Contains(t, s.mergedConfig.Load(), "runtime.type: test")
})
}

func Test_handleAgentOpAMPMessage(t *testing.T) {
Expand Down

0 comments on commit cb7f220

Please sign in to comment.