Skip to content

Commit

Permalink
[cmd/opampsupervisor] Forward custom messages to/from agent (#33576)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
* Forwards custom messages through the supervisor to/from the agent

**Link to tracking Issue:** Closes #33575

**Testing:**
* Tested against BindPlane
* E2E tests aren't possible since there is no component using custom
messages in contrib right now.
* Added a couple unit tests
  • Loading branch information
BinaryFissionGames authored Jul 16, 2024
1 parent a2edeb8 commit be85cc8
Show file tree
Hide file tree
Showing 4 changed files with 496 additions and 82 deletions.
13 changes: 13 additions & 0 deletions .chloggen/feat_supervisor-passthrough-custom-capabilities.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cmd/opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Adds support for forwarding custom messages to/from the agent"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33575]
62 changes: 39 additions & 23 deletions cmd/opampsupervisor/supervisor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,50 @@ import (
)

type flattenedSettings struct {
onMessageFunc func(conn serverTypes.Connection, message *protobufs.AgentToServer)
onConnectingFunc func(request *http.Request)
endpoint string
onMessageFunc func(conn serverTypes.Connection, message *protobufs.AgentToServer)
onConnectingFunc func(request *http.Request) (shouldConnect bool, rejectStatusCode int)
onConnectionCloseFunc func(conn serverTypes.Connection)
endpoint string
}

func newServerSettings(fs flattenedSettings) server.StartSettings {
func (fs flattenedSettings) toServerSettings() server.StartSettings {
return server.StartSettings{
Settings: server.Settings{
Callbacks: server.CallbacksStruct{
OnConnectingFunc: func(request *http.Request) serverTypes.ConnectionResponse {
if fs.onConnectingFunc != nil {
fs.onConnectingFunc(request)
}
return serverTypes.ConnectionResponse{
Accept: true,
ConnectionCallbacks: server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if fs.onMessageFunc != nil {
fs.onMessageFunc(conn, message)
}

return &protobufs.ServerToAgent{}
},
},
}
},
},
Callbacks: fs,
},
ListenEndpoint: fs.endpoint,
}
}

func (fs flattenedSettings) OnConnecting(request *http.Request) serverTypes.ConnectionResponse {
if fs.onConnectingFunc != nil {
shouldConnect, rejectStatusCode := fs.onConnectingFunc(request)
if !shouldConnect {
return serverTypes.ConnectionResponse{
Accept: false,
HTTPStatusCode: rejectStatusCode,
}
}
}

return serverTypes.ConnectionResponse{
Accept: true,
ConnectionCallbacks: fs,
}
}

func (fs flattenedSettings) OnConnected(_ context.Context, _ serverTypes.Connection) {}

func (fs flattenedSettings) OnMessage(_ context.Context, conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if fs.onMessageFunc != nil {
fs.onMessageFunc(conn, message)
}

return &protobufs.ServerToAgent{}
}

func (fs flattenedSettings) OnConnectionClose(conn serverTypes.Connection) {
if fs.onConnectionCloseFunc != nil {
fs.onConnectionCloseFunc(conn)
}
}
Loading

0 comments on commit be85cc8

Please sign in to comment.