Skip to content

Commit

Permalink
Foundation to report application processes (#892)
Browse files Browse the repository at this point in the history
* start porting process sampler

* refactored harvester_linux.go

* refactored harvester_linux_test.go

* removed unneeded darwin files

* first refactor version without any integration

* renamings

* Collector provider

* connecting pipelines

* basic integration

* some process metrics verification

* WIP generifying expirer

* Generified OTEL metrics expirer

* fix some unit tests

* procolly metrics exporter merges app+net

* a process exporter per service

* process_cpu_utilization_ratio now reported for OTEL

* Basic process sampling working

* fix lint and unit tests

* merging otel exporters into single package

* fix harvest test

* Reorganized code

* Fixed compilation and unit tests

* Fixed and improved process harvesting

* Process integration tests

* Fix unit test

* fix integration test

* Added extra comment
  • Loading branch information
mariomac authored Jun 6, 2024
1 parent 294a8d9 commit 65007e1
Show file tree
Hide file tree
Showing 37 changed files with 1,807 additions and 203 deletions.
4 changes: 4 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ The Initial Developer of some parts of the product, which are copied from, deriv
inspired by the DataDog Agent (https://github.com/DataDog/datadog-agent).
Copyright DataDog.

The Initial Developer of some parts of the product, which are copied from, derived from, or
inspired by the New Relic Infrastructure Agent (https://github.com/newrelic/infrastructure-agent).
Copyright New Relic.

Grafana Beyla uses third-party libraries or other resources that may be
distributed under licenses different than the Grafana Beyla software. The licenses for
these third-party libraries are listed in the attached third_party_licenses.csv file
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/mariomac/guara v0.0.0-20230621100729-42bd7716e524
github.com/mariomac/pipes v0.10.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.6.0
github.com/prometheus/common v0.48.0
Expand All @@ -34,6 +35,7 @@ require (
go.opentelemetry.io/collector/config/configgrpc v0.97.0
go.opentelemetry.io/collector/config/confighttp v0.97.0
go.opentelemetry.io/collector/config/configopaque v1.4.0
go.opentelemetry.io/collector/config/configretry v0.97.0
go.opentelemetry.io/collector/config/configtelemetry v0.97.0
go.opentelemetry.io/collector/config/configtls v0.97.0
go.opentelemetry.io/collector/consumer v0.97.0
Expand Down Expand Up @@ -124,7 +126,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rs/cors v1.10.1 // indirect
Expand All @@ -140,7 +141,6 @@ require (
go.opentelemetry.io/collector/config/configauth v0.97.0 // indirect
go.opentelemetry.io/collector/config/configcompression v1.4.0 // indirect
go.opentelemetry.io/collector/config/confignet v0.97.0 // indirect
go.opentelemetry.io/collector/config/configretry v0.97.0 // indirect
go.opentelemetry.io/collector/config/internal v0.97.0 // indirect
go.opentelemetry.io/collector/confmap v0.97.0 // indirect
go.opentelemetry.io/collector/extension v0.97.0 // indirect
Expand Down
9 changes: 9 additions & 0 deletions pkg/beyla/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/beyla/pkg/internal/export/prom"
"github.com/grafana/beyla/pkg/internal/filter"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/infraolly/process"
"github.com/grafana/beyla/pkg/internal/traces"
"github.com/grafana/beyla/pkg/services"
"github.com/grafana/beyla/pkg/transform"
Expand Down Expand Up @@ -99,6 +100,10 @@ var DefaultConfig = Config{
},
Routes: &transform.RoutesConfig{},
NetworkFlows: defaultNetworkConfig,
Processes: process.CollectConfig{
RunMode: process.RunModePrivileged,
Interval: 5 * time.Second,
},
}

type Config struct {
Expand Down Expand Up @@ -149,6 +154,10 @@ type Config struct {
ProfilePort int `yaml:"profile_port" env:"BEYLA_PROFILE_PORT"`
InternalMetrics imetrics.Config `yaml:"internal_metrics"`

// Processes metrics for application. They will be only enabled if there is a metrics exporter enabled,
// and both the "application" and "application_process" features are enabled
Processes process.CollectConfig `yaml:"processes"`

// Grafana Agent specific configuration
TracesReceiver TracesReceiverConfig `yaml:"-"`
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/beyla/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/beyla/pkg/internal/export/otel"
"github.com/grafana/beyla/pkg/internal/export/prom"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/infraolly/process"
"github.com/grafana/beyla/pkg/internal/netolly/transform/cidr"
"github.com/grafana/beyla/pkg/internal/traces"
"github.com/grafana/beyla/pkg/transform"
Expand Down Expand Up @@ -165,6 +166,10 @@ network:
CacheLen: 1024,
CacheTTL: 5 * time.Minute,
},
Processes: process.CollectConfig{
RunMode: process.RunModePrivileged,
Interval: 5 * time.Second,
},
}, cfg)
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/discover/typer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ func (t *typer) FilterClassify(evs []Event[ProcessMatch]) []Event[Instrumentable
ev := &evs[i]
switch evs[i].Type {
case EventCreated:
svcID := svc.ID{Name: ev.Obj.Criteria.Name, Namespace: ev.Obj.Criteria.Namespace}
svcID := svc.ID{
Name: ev.Obj.Criteria.Name,
Namespace: ev.Obj.Criteria.Namespace,
ProcPID: ev.Obj.Process.Pid,
}
if elfFile, err := exec.FindExecELF(ev.Obj.Process, svcID); err != nil {
t.log.Warn("error finding process ELF. Ignoring", "error", err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/ebpf/common/pids.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func serviceInfo(pid uint32) svc.ID {

name := commName(pid)
lang := exec.FindProcLanguage(int32(pid), nil)
result := svc.ID{Name: name, SDKLanguage: lang}
result := svc.ID{Name: name, SDKLanguage: lang, ProcPID: int32(pid)}

activePids.Add(pid, result)

Expand Down
17 changes: 17 additions & 0 deletions pkg/internal/export/attributes/attr_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup {
},
}

var processAttributes = AttrReportGroup{
Attributes: map[attr.Name]Default{
attr.ProcCommand: true,
attr.ProcCPUState: true,
attr.ProcOwner: true,
attr.ProcParentPid: true,
attr.ProcPid: true,
attr.ProcCommandLine: false,
attr.ProcCommandArgs: false,
attr.ProcExecName: false,
attr.ProcExecPath: false,
},
}

return map[Section]AttrReportGroup{
BeylaNetworkFlow.Section: {
SubGroups: []*AttrReportGroup{&networkCIDR, &networkKubeAttributes},
Expand Down Expand Up @@ -223,6 +237,9 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup {
attr.DBQueryText: false,
},
},
ProcessCPUUtilization.Section: {
SubGroups: []*AttrReportGroup{&processAttributes},
},
}
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/internal/export/attributes/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ var (
Prom: "db_client_operation_duration_seconds",
OTEL: "db.client.operation.duration",
}
ProcessCPUUtilization = Name{
Section: "process.cpu.utilization",
Prom: "process_cpu_utilization_ratio",
OTEL: "process.cpu.utilization",
}
MessagingPublishDuration = Name{
Section: "messaging.publish.duration",
Prom: "messaging_publish_duration_seconds",
Expand All @@ -79,7 +84,7 @@ var (
// as long as the metric name is recorgnisable.
func normalizeMetric(name Section) Section {
nameStr := strings.ReplaceAll(string(name), "_", ".")
for _, suffix := range []string{".bucket", ".sum", ".count", ".total"} {
for _, suffix := range []string{".ratio", ".bucket", ".sum", ".count", ".total"} {
if strings.HasSuffix(nameStr, suffix) {
nameStr = nameStr[:len(nameStr)-len(suffix)]
break
Expand Down
16 changes: 16 additions & 0 deletions pkg/internal/export/attributes/names/attrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ var (
K8sDstNodeName = Name("k8s.dst.node.name")
)

// Process Metrics following OTEL 1.26 experimental conventions
// https://opentelemetry.io/docs/specs/semconv/resource/process/
// https://opentelemetry.io/docs/specs/semconv/system/process-metrics/

const (
ProcCommand = Name(semconv.ProcessCommandKey)
ProcCommandLine = Name(semconv.ProcessCommandLineKey)
ProcCPUState = Name("process.cpu.state")
ProcOwner = Name(semconv.ProcessOwnerKey)
ProcParentPid = Name(semconv.ProcessParentPIDKey)
ProcPid = Name(semconv.ProcessPIDKey)
ProcCommandArgs = Name(semconv.ProcessCommandArgsKey)
ProcExecName = Name(semconv.ProcessExecutableNameKey)
ProcExecPath = Name(semconv.ProcessExecutablePathKey)
)

// other beyla-specific attributes
var (
// TargetInstance is a Prometheus-only attribute.
Expand Down
33 changes: 28 additions & 5 deletions pkg/internal/export/otel/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var DefaultBuckets = Buckets{
RequestSizeHistogram: []float64{0, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192},
}

func getResourceAttrs(service svc.ID) *resource.Resource {
func getResourceAttrs(service *svc.ID) *resource.Resource {
attrs := []attribute.KeyValue{
semconv.ServiceName(service.Name),
semconv.ServiceInstanceID(service.Instance),
Expand Down Expand Up @@ -87,7 +87,10 @@ func getResourceAttrs(service svc.ID) *resource.Resource {
type ReporterPool[T any] struct {
pool *simplelru.LRU[svc.UID, T]

itemConstructor func(svc.ID) (T, error)
itemConstructor func(*svc.ID) (T, error)

lastReporter T
lastService *svc.ID
}

// NewReporterPool creates a ReporterPool instance given a cache length,
Expand All @@ -97,22 +100,42 @@ type ReporterPool[T any] struct {
func NewReporterPool[T any](
cacheLen int,
callback simplelru.EvictCallback[svc.UID, T],
itemConstructor func(id svc.ID) (T, error),
itemConstructor func(id *svc.ID) (T, error),
) ReporterPool[T] {
pool, _ := simplelru.NewLRU[svc.UID, T](cacheLen, callback)
return ReporterPool[T]{pool: pool, itemConstructor: itemConstructor}
}

// For retrieves the associated item for the given service name, or
// creates a new one if it does not exist
func (rp *ReporterPool[T]) For(service svc.ID) (T, error) {
func (rp *ReporterPool[T]) For(service *svc.ID) (T, error) {
// optimization: do not query the resources' cache if the
// previously processed span belongs to the same service name
// as the current.
// This will save querying OTEL resource reporters when there is
// only a single instrumented process.
// In multi-process tracing, this is likely to happen as most
// tracers group traces belonging to the same service in the same slice.
if rp.lastService == nil || service.UID != rp.lastService.UID {
lm, err := rp.get(service)
if err != nil {
var t T
return t, err
}
rp.lastService = service
rp.lastReporter = lm
}
return rp.lastReporter, nil
}

func (rp *ReporterPool[T]) get(service *svc.ID) (T, error) {
if m, ok := rp.pool.Get(service.UID); ok {
return m, nil
}
m, err := rp.itemConstructor(service)
if err != nil {
var t T
return t, fmt.Errorf("creating resource for service %q: %w", &service, err)
return t, fmt.Errorf("creating resource for service %q: %w", service, err)
}
rp.pool.Add(service.UID, m)
return m, nil
Expand Down
Loading

0 comments on commit 65007e1

Please sign in to comment.