Skip to content

Commit

Permalink
Add basic Agent example (open-telemetry#39)
Browse files Browse the repository at this point in the history
The example demonstrates how to use OpAMPClient to create
a simple Agent that works with OpAMP Server.
  • Loading branch information
tigrannajaryan authored Dec 8, 2021
1 parent f37f19e commit f9217c9
Show file tree
Hide file tree
Showing 10 changed files with 484 additions and 8 deletions.
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@ go 1.17
require (
github.com/cenkalti/backoff/v4 v4.1.2
github.com/gorilla/websocket v1.4.2
github.com/knadh/koanf v1.3.3
github.com/oklog/ulid/v2 v2.0.2
github.com/stretchr/testify v1.7.0
google.golang.org/protobuf v1.27.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
138 changes: 137 additions & 1 deletion go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions internal/examples/agent/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/bin/
280 changes: 280 additions & 0 deletions internal/examples/agent/agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package agent

import (
"context"
"crypto/sha256"
"fmt"
"math/rand"
"os"
"runtime"
"sort"
"time"

"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/oklog/ulid/v2"
"github.com/open-telemetry/opamp-go/client/types"

"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/protobufs"
)

const localConfig = `
exporters:
otlp:
endpoint: localhost:1111
receivers:
otlp:
protocols:
grpc: {}
http: {}
service:
pipelines:
traces:
receivers: [otlp]
processors: []
exporters: [otlp]
`

type Agent struct {
logger types.Logger

agentType string
agentVersion string

effectiveConfig string
effectiveConfigHash []byte

instanceId ulid.ULID

agentDescription *protobufs.AgentDescription

opampClient client.OpAMPClient

remoteConfigHash []byte
}

func NewAgent(logger types.Logger, agentType string, agentVersion string) *Agent {
agent := &Agent{
effectiveConfig: localConfig,
logger: logger,
agentType: agentType,
agentVersion: agentVersion,
}

agent.createAgentIdentity()
agent.logger.Debugf("Agent starting, id=%v, type=%s, version=%s.",
agent.instanceId.String(), agentType, agentVersion)

agent.loadLocalConfig()
if err := agent.start(); err != nil {
agent.logger.Errorf("Cannot start OpAMP client: %v", err)
return nil
}

return agent
}

func (agent *Agent) start() error {
agent.opampClient = client.New(agent.logger)

settings := client.StartSettings{
OpAMPServerURL: "ws://127.0.0.1:4320/v1/opamp",
InstanceUid: agent.instanceId.String(),
AgentType: agent.agentType,
AgentVersion: agent.agentVersion,
Callbacks: client.CallbacksStruct{
OnConnectFunc: func() {
agent.logger.Debugf("Connected to the server.")
},
OnConnectFailedFunc: func(err error) {
agent.logger.Errorf("Failed to connect to the server: %v", err)
},
OnErrorFunc: func(err *protobufs.ServerErrorResponse) {
agent.logger.Errorf("Server returned an error response: %v", err.ErrorMessage)
},
OnRemoteConfigFunc: agent.onRemoteConfig,
},
LastRemoteConfigHash: agent.remoteConfigHash,
LastEffectiveConfig: agent.composeEffectiveConfig(),
}

agent.logger.Debugf("Starting OpAMP client...")

err := agent.opampClient.Start(settings)
if err != nil {
return err
}

agent.logger.Debugf("OpAMP Client started.")

return nil
}

func (agent *Agent) createAgentIdentity() {
// Generate instance id.
entropy := ulid.Monotonic(rand.New(rand.NewSource(0)), 0)
agent.instanceId = ulid.MustNew(ulid.Timestamp(time.Now()), entropy)

hostname, _ := os.Hostname()

// Create agent description.
agent.agentDescription = &protobufs.AgentDescription{
AgentType: agent.agentType,
AgentVersion: agent.agentVersion,
AgentAttributes: []*protobufs.KeyValue{
{
Key: "os.family",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{
StringValue: runtime.GOOS,
},
},
},
{
Key: "host.name",
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{
StringValue: hostname,
},
},
},
},
}
}

func (agent *Agent) loadLocalConfig() {
var k = koanf.New(".")
k.Load(rawbytes.Provider([]byte(localConfig)), yaml.Parser())

effectiveConfigBytes, err := k.Marshal(yaml.Parser())
if err != nil {
panic(err)
}

agent.effectiveConfig = string(effectiveConfigBytes)
hash := sha256.Sum256(effectiveConfigBytes)
agent.effectiveConfigHash = hash[:]
}

func (agent *Agent) composeEffectiveConfig() *protobufs.EffectiveConfig {
return &protobufs.EffectiveConfig{
Hash: agent.effectiveConfigHash,
ConfigMap: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: []byte(agent.effectiveConfig)},
},
},
}
}

func (agent *Agent) onRemoteConfig(
ctx context.Context,
config *protobufs.AgentRemoteConfig,
) (*protobufs.EffectiveConfig, error) {
err := agent.applyRemoteConfig(config)
if err != nil {
return nil, err
}
return agent.composeEffectiveConfig(), nil
}

type agentConfigFileItem struct {
name string
file *protobufs.AgentConfigFile
}

type agentConfigFileSlice []agentConfigFileItem

func (a agentConfigFileSlice) Less(i, j int) bool {
return a[i].name < a[j].name
}

func (a agentConfigFileSlice) Swap(i, j int) {
t := a[i]
a[i] = a[j]
a[j] = t
}

func (a agentConfigFileSlice) Len() int {
return len(a)
}

func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) error {
if config == nil {
return nil
}

agent.logger.Debugf("Received remote config from server, hash=%x.", config.ConfigHash)

// Begin with local config. We will later merge received configs on top of it.
var k = koanf.New(".")
if err := k.Load(rawbytes.Provider([]byte(localConfig)), yaml.Parser()); err != nil {
return err
}

orderedConfigs := agentConfigFileSlice{}
for name, file := range config.Config.ConfigMap {
if name == "" {
// skip instance config
continue
}
orderedConfigs = append(orderedConfigs, agentConfigFileItem{
name: name,
file: file,
})
}

// Sort to make sure the order of merging is stable.
sort.Sort(orderedConfigs)

// Append instance config as the last item.
instanceConfig := config.Config.ConfigMap[""]
if instanceConfig != nil {
orderedConfigs = append(orderedConfigs, agentConfigFileItem{
name: "",
file: instanceConfig,
})
}

// Merge received configs.
for _, item := range orderedConfigs {
var k2 = koanf.New(".")
err := k2.Load(rawbytes.Provider(item.file.Body), yaml.Parser())
if err != nil {
return fmt.Errorf("cannot parse config named %s: %v", item.name, err)
}
err = k.Merge(k2)
if err != nil {
return fmt.Errorf("cannot merge config named %s: %v", item.name, err)
}
}

// The merged final result is our effective config.
effectiveConfigBytes, err := k.Marshal(yaml.Parser())
if err != nil {
panic(err)
}

newEffectiveConfig := string(effectiveConfigBytes)
if agent.effectiveConfig != newEffectiveConfig {
agent.logger.Debugf("Effective config changed. Need to report to server.")
agent.effectiveConfig = newEffectiveConfig
hash := sha256.Sum256(effectiveConfigBytes)
agent.effectiveConfigHash = hash[:]
}

agent.remoteConfigHash = config.ConfigHash

return nil
}

func (agent *Agent) Shutdown() {
agent.logger.Debugf("Agent shutting down...")
if agent.opampClient != nil {
agent.opampClient.Stop(context.Background())
}
}
15 changes: 15 additions & 0 deletions internal/examples/agent/agent/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package agent

import "log"

type Logger struct {
Logger *log.Logger
}

func (l *Logger) Debugf(format string, v ...interface{}) {
l.Logger.Printf(format, v...)
}

func (l *Logger) Errorf(format string, v ...interface{}) {
l.Logger.Printf(format, v...)
}
27 changes: 27 additions & 0 deletions internal/examples/agent/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"flag"
"log"
"os"
"os/signal"

"github.com/open-telemetry/opamp-go/internal/examples/agent/agent"
)

func main() {
var agentType string
flag.StringVar(&agentType, "t", "io.opentelemetry.collector", "Agent Type String")

var agentVersion string
flag.StringVar(&agentVersion, "v", "1.0.0", "Agent Version String")

flag.Parse()

agent := agent.NewAgent(&agent.Logger{log.Default()}, agentType, agentVersion)

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
agent.Shutdown()
}
5 changes: 3 additions & 2 deletions internal/examples/server/data/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (agents *Agents) FindOrCreateAgent(agentId InstanceId, conn types.Connectio
// Ensure the agent is in the agentsById map.
agent := agents.agentsById[agentId]
if agent == nil {
agent := NewAgent(agentId, conn)
agent = NewAgent(agentId, conn)
agents.agentsById[agentId] = agent

// Ensure the agent's instance id is associated with the connection.
Expand Down Expand Up @@ -113,5 +113,6 @@ func (agents *Agents) GetAllAgentsReadonlyClone() map[InstanceId]*Agent {
}

var AllAgents = Agents{
agentsById: map[InstanceId]*Agent{},
agentsById: map[InstanceId]*Agent{},
connections: map[types.Connection]map[InstanceId]bool{},
}
2 changes: 1 addition & 1 deletion internal/examples/server/uisrv/html/agent.html
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ <h3>Configuration</h3>
Additional Configuration:<br/>
<form action="/save_config" method="post">
<input type="hidden" name="instanceid" value="{{ .InstanceId }}"/>
<textarea cols="40" rows="20" name="config">{{ .CustomConfigForInstance }}</textarea><br/>
<textarea cols="40" rows="20" name="config">{{ .CustomInstanceConfig }}</textarea><br/>
{{if .Status.RemoteConfigStatus }}
{{if .Status.RemoteConfigStatus.ErrorMessage }}
<span style="color:red">Failed: {{ .Status.RemoteConfigStatus.ErrorMessage }}</span><br/>
Expand Down
Loading

0 comments on commit f9217c9

Please sign in to comment.