Skip to content

Commit

Permalink
Instrument proctelemetry.ProcessMetrics (open-telemetry#6886)
Browse files Browse the repository at this point in the history
* Instrument proctelemetry.ProcessMetrics (open-telemetry#67)

* Instrument proctelemetry.ProcessMetrics

* add chloggen

* fix lint and remove unneeded context

* catch potential case where some metricValues are 0

* address review feedback

* rebase and remove unneeded registry

* only run assertMetrics for OC

* address review comments

* rebase and update featuregate function in use

* rebase and update to use new otel metrics api

* fix lint err

* address feedback

* newline

* use multierr for errors
  • Loading branch information
moh-osman3 authored Feb 2, 2023
1 parent 84c3d00 commit 297e391
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 20 deletions.
12 changes: 12 additions & 0 deletions .chloggen/proctelemetry-processmetrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: proctelemetry

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Instrument `proctelemetry.ProcessMetrics` metrics with otel-go"

# One or more tracking issues or pull requests related to the change
issues: [6886]

110 changes: 102 additions & 8 deletions service/internal/proctelemetry/process_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry"

import (
"context"
"os"
"runtime"
"sync"
Expand All @@ -23,6 +24,15 @@ import (
"github.com/shirou/gopsutil/v3/process"
"go.opencensus.io/metric"
"go.opencensus.io/stats"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/unit"
"go.uber.org/multierr"
)

const (
scopeName = "go.opentelemetry.io/collector/service/process_telemetry"
processNameKey = "process_name"
)

// processMetrics is a struct that contains views related to process metrics (cpu, mem, etc)
Expand All @@ -38,6 +48,14 @@ type processMetrics struct {
cpuSeconds *metric.Float64DerivedCumulative
rssMemory *metric.Int64DerivedGauge

// otel metrics
otelProcessUptime instrument.Float64ObservableCounter
otelAllocMem instrument.Int64ObservableGauge
otelTotalAllocMem instrument.Int64ObservableCounter
otelSysMem instrument.Int64ObservableGauge
otelCPUSeconds instrument.Float64ObservableCounter
otelRSSMemory instrument.Int64ObservableGauge

// mu protects everything bellow.
mu sync.Mutex
lastMsRead time.Time
Expand All @@ -46,19 +64,29 @@ type processMetrics struct {

// RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure
// basic information about this process.
func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) error {
func RegisterProcessMetrics(ocRegistry *metric.Registry, mp otelmetric.MeterProvider, useOtel bool, ballastSizeBytes uint64) error {
var err error
pm := &processMetrics{
startTimeUnixNano: time.Now().UnixNano(),
ballastSizeBytes: ballastSizeBytes,
ms: &runtime.MemStats{},
}
var err error

pm.proc, err = process.NewProcess(int32(os.Getpid()))
if err != nil {
return err
}

pm.processUptime, err = registry.AddFloat64DerivedCumulative(
if useOtel {
return pm.recordWithOtel(mp.Meter(scopeName))
}
return pm.recordWithOC(ocRegistry)
}

func (pm *processMetrics) recordWithOC(ocRegistry *metric.Registry) error {
var err error

pm.processUptime, err = ocRegistry.AddFloat64DerivedCumulative(
"process/uptime",
metric.WithDescription("Uptime of the process"),
metric.WithUnit(stats.UnitSeconds))
Expand All @@ -69,7 +97,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return err
}

pm.allocMem, err = registry.AddInt64DerivedGauge(
pm.allocMem, err = ocRegistry.AddInt64DerivedGauge(
"process/runtime/heap_alloc_bytes",
metric.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"),
metric.WithUnit(stats.UnitBytes))
Expand All @@ -80,7 +108,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return err
}

pm.totalAllocMem, err = registry.AddInt64DerivedCumulative(
pm.totalAllocMem, err = ocRegistry.AddInt64DerivedCumulative(
"process/runtime/total_alloc_bytes",
metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"),
metric.WithUnit(stats.UnitBytes))
Expand All @@ -91,7 +119,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return err
}

pm.sysMem, err = registry.AddInt64DerivedGauge(
pm.sysMem, err = ocRegistry.AddInt64DerivedGauge(
"process/runtime/total_sys_memory_bytes",
metric.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"),
metric.WithUnit(stats.UnitBytes))
Expand All @@ -102,7 +130,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return err
}

pm.cpuSeconds, err = registry.AddFloat64DerivedCumulative(
pm.cpuSeconds, err = ocRegistry.AddFloat64DerivedCumulative(
"process/cpu_seconds",
metric.WithDescription("Total CPU user and system time in seconds"),
metric.WithUnit(stats.UnitSeconds))
Expand All @@ -113,7 +141,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return err
}

pm.rssMemory, err = registry.AddInt64DerivedGauge(
pm.rssMemory, err = ocRegistry.AddInt64DerivedGauge(
"process/memory/rss",
metric.WithDescription("Total physical memory (resident set size)"),
metric.WithUnit(stats.UnitBytes))
Expand All @@ -127,6 +155,72 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return nil
}

func (pm *processMetrics) recordWithOtel(meter otelmetric.Meter) error {
var errs, err error

pm.otelProcessUptime, err = meter.Float64ObservableCounter(
"process_uptime",
instrument.WithDescription("Uptime of the process"),
instrument.WithUnit(unit.Unit("s")),
instrument.WithFloat64Callback(func(_ context.Context, o instrument.Float64Observer) error {
o.Observe(pm.updateProcessUptime())
return nil
}))
errs = multierr.Append(errs, err)

pm.otelAllocMem, err = meter.Int64ObservableGauge(
"process_runtime_heap_alloc_bytes",
instrument.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"),
instrument.WithUnit(unit.Bytes),
instrument.WithInt64Callback(func(_ context.Context, o instrument.Int64Observer) error {
o.Observe(pm.updateAllocMem())
return nil
}))
errs = multierr.Append(errs, err)

pm.otelTotalAllocMem, err = meter.Int64ObservableCounter(
"process_runtime_total_alloc_bytes",
instrument.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"),
instrument.WithUnit(unit.Bytes),
instrument.WithInt64Callback(func(_ context.Context, o instrument.Int64Observer) error {
o.Observe(pm.updateTotalAllocMem())
return nil
}))
errs = multierr.Append(errs, err)

pm.otelSysMem, err = meter.Int64ObservableGauge(
"process_runtime_total_sys_memory_bytes",
instrument.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"),
instrument.WithUnit(unit.Bytes),
instrument.WithInt64Callback(func(_ context.Context, o instrument.Int64Observer) error {
o.Observe(pm.updateSysMem())
return nil
}))
errs = multierr.Append(errs, err)

pm.otelCPUSeconds, err = meter.Float64ObservableCounter(
"process_cpu_seconds",
instrument.WithDescription("Total CPU user and system time in seconds"),
instrument.WithUnit(unit.Unit("s")),
instrument.WithFloat64Callback(func(_ context.Context, o instrument.Float64Observer) error {
o.Observe(pm.updateCPUSeconds())
return nil
}))
errs = multierr.Append(errs, err)

pm.otelRSSMemory, err = meter.Int64ObservableGauge(
"process_memory_rss",
instrument.WithDescription("Total physical memory (resident set size)"),
instrument.WithUnit(unit.Bytes),
instrument.WithInt64Callback(func(_ context.Context, o instrument.Int64Observer) error {
o.Observe(pm.updateRSSMemory())
return nil
}))
errs = multierr.Append(errs, err)

return errs
}

func (pm *processMetrics) updateProcessUptime() float64 {
now := time.Now().UnixNano()
return float64(now-pm.startTimeUnixNano) / 1e9
Expand Down
129 changes: 119 additions & 10 deletions service/internal/proctelemetry/process_telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,41 @@
package proctelemetry

import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats/view"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
otelmetric "go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/obsreportconfig"
)

type testTelemetry struct {
component.TelemetrySettings
views []*view.View
promHandler http.Handler
meterProvider *sdkmetric.MeterProvider
expectedMetrics []string
}

var expectedMetrics = []string{
// Changing a metric name is a breaking change.
// Adding new metrics is ok as long it follows the conventions described at
Expand All @@ -36,14 +62,98 @@ var expectedMetrics = []string{
"process/memory/rss",
}

func TestProcessTelemetry(t *testing.T) {
registry := metric.NewRegistry()
require.NoError(t, RegisterProcessMetrics(registry, 0))
var otelExpectedMetrics = []string{
// OTel Go adds `_total` suffix
"process_uptime",
"process_runtime_heap_alloc_bytes",
"process_runtime_total_alloc_bytes",
"process_runtime_total_sys_memory_bytes",
"process_cpu_seconds",
"process_memory_rss",
}

func setupTelemetry(t *testing.T) testTelemetry {
settings := testTelemetry{
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
expectedMetrics: otelExpectedMetrics,
}
settings.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal

settings.views = obsreportconfig.AllViews(configtelemetry.LevelNormal)
err := view.Register(settings.views...)
require.NoError(t, err)

promReg := prometheus.NewRegistry()
exporter, err := otelprom.New(otelprom.WithRegisterer(promReg), otelprom.WithoutUnits())
require.NoError(t, err)

settings.meterProvider = sdkmetric.NewMeterProvider(
sdkmetric.WithResource(resource.Empty()),
sdkmetric.WithReader(exporter),
)
settings.TelemetrySettings.MeterProvider = settings.meterProvider

settings.promHandler = promhttp.HandlerFor(promReg, promhttp.HandlerOpts{})

t.Cleanup(func() { assert.NoError(t, settings.meterProvider.Shutdown(context.Background())) })

return settings
}

func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_client.MetricFamily, error) {
req, err := http.NewRequest("GET", "/metrics", nil)
if err != nil {
return nil, err
}

rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)

var parser expfmt.TextParser
return parser.TextToMetricFamilies(rr.Body)
}

func TestOtelProcessTelemetry(t *testing.T) {
tel := setupTelemetry(t)

require.NoError(t, RegisterProcessMetrics(nil, tel.MeterProvider, true, 0))

mp, err := fetchPrometheusMetrics(tel.promHandler)
require.NoError(t, err)

for _, metricName := range tel.expectedMetrics {
metric, ok := mp[metricName]
if !ok {
withSuffix := metricName + "_total"
metric, ok = mp[withSuffix]
}
require.True(t, ok)
require.True(t, len(metric.Metric) == 1)
var metricValue float64
if metric.GetType() == io_prometheus_client.MetricType_COUNTER {
metricValue = metric.Metric[0].GetCounter().GetValue()
} else {
metricValue = metric.Metric[0].GetGauge().GetValue()
}
if strings.HasPrefix(metricName, "process_uptime") || strings.HasPrefix(metricName, "process_cpu_seconds") {
// This likely will still be zero when running the test.
assert.GreaterOrEqual(t, metricValue, float64(0), metricName)
continue
}

assert.Greater(t, metricValue, float64(0), metricName)
}
}

func TestOCProcessTelemetry(t *testing.T) {
ocRegistry := metric.NewRegistry()

require.NoError(t, RegisterProcessMetrics(ocRegistry, otelmetric.NewNoopMeterProvider(), false, 0))

// Check that the metrics are actually filled.
<-time.After(200 * time.Millisecond)

metrics := registry.Read()
metrics := ocRegistry.Read()

for _, metricName := range expectedMetrics {
m := findMetric(metrics, metricName)
Expand All @@ -62,22 +172,21 @@ func TestProcessTelemetry(t *testing.T) {

if metricName == "process/uptime" || metricName == "process/cpu_seconds" {
// This likely will still be zero when running the test.
assert.True(t, value >= 0, metricName)
assert.GreaterOrEqual(t, value, float64(0), metricName)
continue
}

assert.True(t, value > 0, metricName)
assert.Greater(t, value, float64(0), metricName)
}
}

func TestProcessTelemetryFailToRegister(t *testing.T) {

for _, metricName := range expectedMetrics {
t.Run(metricName, func(t *testing.T) {
registry := metric.NewRegistry()
_, err := registry.AddFloat64Gauge(metricName)
ocRegistry := metric.NewRegistry()
_, err := ocRegistry.AddFloat64Gauge(metricName)
require.NoError(t, err)
assert.Error(t, RegisterProcessMetrics(registry, 0))
assert.Error(t, RegisterProcessMetrics(ocRegistry, otelmetric.NewNoopMeterProvider(), false, 0))
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings,

if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" {
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, srv.telemetryInitializer.mp, obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled(), getBallastSize(srv.host)); err != nil {
return fmt.Errorf("failed to register process metrics: %w", err)
}
}
Expand Down
4 changes: 3 additions & 1 deletion service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ func testCollectorStartHelper(t *testing.T, useOtel bool, tc ownMetricsTestCase)
// Sleep for 1 second to ensure the http server is started.
time.Sleep(1 * time.Second)
assert.True(t, loggingHookCalled)
assertMetrics(t, metricsAddr, tc.expectedLabels)
if !useOtel {
assertMetrics(t, metricsAddr, tc.expectedLabels)
}
assertZPages(t, zpagesAddr)
require.NoError(t, srv.Shutdown(context.Background()))
}
Expand Down

0 comments on commit 297e391

Please sign in to comment.