Skip to content

Commit

Permalink
Merge branch 'main' into gfast-add-server-middleware-hook
Browse files Browse the repository at this point in the history
  • Loading branch information
gdfast committed Mar 20, 2024
2 parents 213856b + d1d6081 commit a098362
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 182 deletions.
90 changes: 41 additions & 49 deletions internal/examples/agent/agent/metricreporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@ import (

"github.com/oklog/ulid/v2"
"github.com/shirou/gopsutil/process"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
otelresource "go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"

Expand All @@ -32,16 +29,16 @@ type MetricReporter struct {
logger types.Logger

meter metric.Meter
meterShutdowner func()
meterShutdowner func(ctx context.Context) error
done chan struct{}

// The Agent's process.
process *process.Process

// Some example metrics to report.
processMemoryPhysical metric.Int64GaugeObserver
processMemoryPhysical metric.Int64ObservableGauge
counter metric.Int64Counter
processCpuTime metric.Float64CounterObserver
processCpuTime metric.Float64ObservableCounter
}

func NewMetricReporter(
Expand Down Expand Up @@ -75,9 +72,7 @@ func NewMetricReporter(
opts = append(opts, otlpmetrichttp.WithInsecure())
}

client := otlpmetrichttp.NewClient(opts...)

metricExporter, err := otlpmetric.New(context.Background(), client)
metricExporter, err := otlpmetrichttp.New(context.Background())
if err != nil {
err := fmt.Errorf("failed to initialize stdoutmetric export pipeline: %v", err)
return nil, err
Expand All @@ -94,81 +89,82 @@ func NewMetricReporter(
),
)

// Wire up the Resource and the exporter together.
cont := controller.New(
processor.NewFactory(
simple.NewWithInexpensiveDistribution(),
metricExporter,
),
controller.WithExporter(metricExporter),
controller.WithCollectPeriod(5*time.Second),
controller.WithResource(resource),
)

err = cont.Start(context.Background())
if err != nil {
err := fmt.Errorf("failed to initialize metric controller: %v", err)
return nil, err
}
// Wire up the Resource and the exporter together into a meter provider
meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithResource(resource),
sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(metricExporter, sdkmetric.WithInterval(5*time.Second)),
))

global.SetMeterProvider(cont)
otel.SetMeterProvider(meterProvider)

reporter := &MetricReporter{
logger: logger,
}

reporter.done = make(chan struct{})

reporter.meter = global.Meter("opamp")
reporter.meter = otel.Meter("opamp")

reporter.process, err = process.NewProcess(int32(os.Getpid()))
if err != nil {
err := fmt.Errorf("cannot query own process: %v", err)
return nil, err
return nil, fmt.Errorf("cannot query own process: %v", err)
}

// Create some metrics that will be reported according to OpenTelemetry semantic
// conventions for process metrics (conventions are TBD for now).
reporter.processCpuTime = metric.Must(reporter.meter).NewFloat64CounterObserver(
reporter.processCpuTime, err = reporter.meter.Float64ObservableCounter(
"process.cpu.time",
reporter.processCpuTimeFunc,
metric.WithFloat64Callback(reporter.processCpuTimeFunc),
)
if err != nil {
return nil, fmt.Errorf("could not initiatilize 'process.cpu.time' instrument: %v", err)
}

reporter.processMemoryPhysical = metric.Must(reporter.meter).NewInt64GaugeObserver(
reporter.processMemoryPhysical, err = reporter.meter.Int64ObservableGauge(
"process.memory.physical_usage",
reporter.processMemoryPhysicalFunc,
metric.WithInt64Callback(reporter.processMemoryPhysicalFunc),
)
if err != nil {
return nil, fmt.Errorf("could not initiatilize 'process.memory.physical_usage' instrument: %v", err)
}

reporter.counter = metric.Must(reporter.meter).NewInt64Counter("custom_metric_ticks")
reporter.counter, err = reporter.meter.Int64Counter("custom_metric_ticks")
if err != nil {
return nil, fmt.Errorf("could not initiatilize 'custom_metric_ticks' instrument: %v", err)
}

reporter.meterShutdowner = func() { _ = cont.Stop(context.Background()) }
reporter.meterShutdowner = meterProvider.Shutdown

go reporter.sendMetrics()

return reporter, nil
}

func (reporter *MetricReporter) processCpuTimeFunc(ctx context.Context, result metric.Float64ObserverResult) {
func (reporter *MetricReporter) processCpuTimeFunc(ctx context.Context, result metric.Float64Observer) error {
times, err := reporter.process.Times()
if err != nil {
reporter.logger.Errorf(ctx, "Cannot get process CPU times: %v", err)
return err
}

// Report process CPU times, but also add some randomness to make it interesting for demo.
result.Observe(math.Min(times.User+rand.Float64(), 1), attribute.String("state", "user"))
result.Observe(math.Min(times.System+rand.Float64(), 1), attribute.String("state", "system"))
result.Observe(math.Min(times.Iowait+rand.Float64(), 1), attribute.String("state", "wait"))
result.Observe(math.Min(times.User+rand.Float64(), 1), metric.WithAttributes(attribute.String("state", "user")))
result.Observe(math.Min(times.System+rand.Float64(), 1), metric.WithAttributes(attribute.String("state", "system")))
result.Observe(math.Min(times.Iowait+rand.Float64(), 1), metric.WithAttributes(attribute.String("state", "wait")))
return nil
}

func (reporter *MetricReporter) processMemoryPhysicalFunc(ctx context.Context, result metric.Int64ObserverResult) {
func (reporter *MetricReporter) processMemoryPhysicalFunc(ctx context.Context, result metric.Int64Observer) error {
memory, err := reporter.process.MemoryInfo()
if err != nil {
reporter.logger.Errorf(ctx, "Cannot get process memory information: %v", err)
return
return err
}

// Report the RSS, but also add some randomness to make it interesting for demo.
result.Observe(int64(memory.RSS) + rand.Int63n(10000000))
return nil
}

func (reporter *MetricReporter) sendMetrics() {
Expand All @@ -184,11 +180,7 @@ func (reporter *MetricReporter) sendMetrics() {

case <-t.C:
ctx := context.Background()
reporter.meter.RecordBatch(
ctx,
[]attribute.KeyValue{},
reporter.counter.Measurement(ticks),
)
reporter.counter.Add(ctx, ticks)
ticks++
}
}
Expand All @@ -200,6 +192,6 @@ func (reporter *MetricReporter) Shutdown() {
}

if reporter.meterShutdowner != nil {
reporter.meterShutdowner()
reporter.meterShutdowner(context.Background())
}
}
39 changes: 18 additions & 21 deletions internal/examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,38 @@ require (
github.com/open-telemetry/opamp-go v0.1.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/otel v1.3.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.26.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.26.0
go.opentelemetry.io/otel/metric v0.26.0
go.opentelemetry.io/otel/sdk v1.3.0
go.opentelemetry.io/otel/sdk/metric v0.26.0
google.golang.org/protobuf v1.32.0
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.24.0
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/sdk/metric v1.24.0
google.golang.org/protobuf v1.33.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-logr/logr v1.2.1 // indirect
github.com/go-logr/stdr v1.2.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.3.0 // indirect
go.opentelemetry.io/otel/internal/metric v0.26.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.26.0 // indirect
go.opentelemetry.io/otel/trace v1.3.0 // indirect
go.opentelemetry.io/proto/otlp v0.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/grpc v1.42.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/grpc v1.62.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
Loading

0 comments on commit a098362

Please sign in to comment.