Skip to content

Commit

Permalink
Added the ability to add Resource Attributes for all the scrappers (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#2)

* Process scrapper

* update tommyers to ishleenk17

* mod change

* Updated processor scrapper

* Update system.process.memory.size metric

* Add the processes dataset

* Update dataset name and add a metric in process

* Update go.mod and go.sum

* Add more metrics to process.go

* Add metrics to pocess scrapper

* Address review comments

* Add starttimestamp to process.cpu.time

* Address review comments

* Undo changes of go.mod and go.sum changes

* Update paths

* Add the cpu pct metric

* Add Network Scrapper mapping

* Test for Resource attributefor proces scrapper

* Trying out another way of adding resourceattr

* Add support for ResourceMetric in all scrappers

* Add resource attribute to process scrapper

* change rm to resource

* Update go.mod

* Move addition of Process attribute to process file
  • Loading branch information
ishleenk17 committed Mar 11, 2024
1 parent 98e6638 commit 80c19e9
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 21 deletions.
2 changes: 1 addition & 1 deletion processor/elasticprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
go.opentelemetry.io/collector/consumer v0.92.0
go.opentelemetry.io/collector/pdata v1.0.1
go.opentelemetry.io/collector/processor v0.92.0
go.uber.org/zap v1.26.0
)

require (
Expand All @@ -31,7 +32,6 @@ require (
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions processor/elasticprocessor/internal/hostmetrics/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
)

func addCPUMetrics(metrics pmetric.MetricSlice, dataset string) error {
func addCPUMetrics(metrics pmetric.MetricSlice, resource pcommon.Resource, dataset string) error {
var timestamp pcommon.Timestamp
var numCores int64
var totalPercent, idlePercent, systemPercent, userPercent, stealPercent,
Expand Down Expand Up @@ -71,7 +71,7 @@ func addCPUMetrics(metrics pmetric.MetricSlice, dataset string) error {
irqNorm := irqPercent / float64(numCores)
softirqNorm := softirqPercent / float64(numCores)

addMetrics(metrics, dataset,
addMetrics(metrics, resource, dataset,
metric{
dataType: Sum,
name: "system.cpu.cores",
Expand Down
13 changes: 7 additions & 6 deletions processor/elasticprocessor/internal/hostmetrics/hostmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

Expand All @@ -23,7 +24,7 @@ var scraperToElasticDataset = map[string]string{
// AddElasticSystemMetrics computes additional metrics for compatibility with the Elastic system integration.
// The `scopeMetrics` input should be metrics generated by a specific hostmetrics scraper.
// `scopeMetrics` are modified in place.
func AddElasticSystemMetrics(scopeMetrics pmetric.ScopeMetrics, storage map[string]any) error {
func AddElasticSystemMetrics(scopeMetrics pmetric.ScopeMetrics, resource pcommon.Resource, storage map[string]any) error {
scope := scopeMetrics.Scope()
scraper := path.Base(scope.Name())

Expand All @@ -41,15 +42,15 @@ func AddElasticSystemMetrics(scopeMetrics pmetric.ScopeMetrics, storage map[stri

switch scraper {
case "cpu":
return addCPUMetrics(scopeMetrics.Metrics(), dataset)
return addCPUMetrics(scopeMetrics.Metrics(), resource, dataset)
case "memory":
return addMemoryMetrics(scopeMetrics.Metrics(), dataset)
return addMemoryMetrics(scopeMetrics.Metrics(), resource, dataset)
case "load":
return addLoadMetrics(scopeMetrics.Metrics(), dataset)
return addLoadMetrics(scopeMetrics.Metrics(), resource, dataset)
case "process":
return addProcessMetrics(scopeMetrics.Metrics(), dataset)
return addProcessMetrics(scopeMetrics.Metrics(), resource, dataset)
case "processes":
return addProcessSummaryMetrics(scopeMetrics.Metrics(), dataset)
return addProcessSummaryMetrics(scopeMetrics.Metrics(), resource, dataset)
default:
return fmt.Errorf("no matching transform function found for scope '%s'", scope.Name())
}
Expand Down
4 changes: 2 additions & 2 deletions processor/elasticprocessor/internal/hostmetrics/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
)

func addLoadMetrics(metrics pmetric.MetricSlice, dataset string) error {
func addLoadMetrics(metrics pmetric.MetricSlice, resource pcommon.Resource, dataset string) error {
var timestamp pcommon.Timestamp
var l1, l5, l15 float64

Expand All @@ -32,7 +32,7 @@ func addLoadMetrics(metrics pmetric.MetricSlice, dataset string) error {
}
}

addMetrics(metrics, dataset,
addMetrics(metrics, resource, dataset,
metric{
dataType: Gauge,
name: "system.load.1",
Expand Down
4 changes: 2 additions & 2 deletions processor/elasticprocessor/internal/hostmetrics/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
)

func addMemoryMetrics(metrics pmetric.MetricSlice, dataset string) error {
func addMemoryMetrics(metrics pmetric.MetricSlice, resource pcommon.Resource, dataset string) error {
var timestamp pcommon.Timestamp
var total, free, cached, usedBytes, actualFree, actualUsedBytes int64
var usedPercent, actualUsedPercent float64
Expand Down Expand Up @@ -74,7 +74,7 @@ func addMemoryMetrics(metrics pmetric.MetricSlice, dataset string) error {
usedBytes += total
actualFree = total - actualUsedBytes

addMetrics(metrics, dataset,
addMetrics(metrics, resource, dataset,
metric{
dataType: Sum,
name: "system.memory.total",
Expand Down
6 changes: 5 additions & 1 deletion processor/elasticprocessor/internal/hostmetrics/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type metric struct {
doubleValue *float64
}

func addMetrics(ms pmetric.MetricSlice, dataset string, metrics ...metric) {
func addMetrics(ms pmetric.MetricSlice, resource pcommon.Resource, dataset string, metrics ...metric) {
ms.EnsureCapacity(ms.Len() + len(metrics))

for _, metric := range metrics {
Expand All @@ -47,6 +47,10 @@ func addMetrics(ms pmetric.MetricSlice, dataset string, metrics ...metric) {
dp.SetStartTimestamp(metric.startTimestamp)
}

if dataset == "system.process" {
addProcessAttributes(resource, dp)
}

dp.Attributes().PutStr("data_stream.dataset", dataset)
}
}
35 changes: 31 additions & 4 deletions processor/elasticprocessor/internal/hostmetrics/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
)

func addProcessMetrics(metrics pmetric.MetricSlice, dataset string) error {
func addProcessMetrics(metrics pmetric.MetricSlice, resource pcommon.Resource, dataset string) error {
var timestamp pcommon.Timestamp
var startTime, threads, memUsage, memVirtual, fdOpen, ioReadBytes, ioWriteBytes, ioReadOperations, ioWriteOperations int64
var memUtil, memUtilPct, total, cpuTimeValue, systemCpuTime, userCpuTime float64
var startTime, processRuntime, threads, memUsage, memVirtual, fdOpen, ioReadBytes, ioWriteBytes, ioReadOperations, ioWriteOperations int64
var memUtil, memUtilPct, total, cpuTimeValue, systemCpuTime, userCpuTime, cpuPct float64

for i := 0; i < metrics.Len(); i++ {
metric := metrics.At(i)
Expand Down Expand Up @@ -129,8 +129,10 @@ func addProcessMetrics(metrics pmetric.MetricSlice, dataset string) error {
cpuTimeValue = total * 1000
systemCpuTime = systemCpuTime * 1000
userCpuTime = userCpuTime * 1000
processRuntime = timestamp.AsTime().UnixMilli() - startTime
cpuPct = cpuTimeValue / float64(processRuntime)

addMetrics(metrics, dataset,
addMetrics(metrics, resource, dataset,
metric{
dataType: Sum,
name: "process.cpu.start_time",
Expand Down Expand Up @@ -222,7 +224,32 @@ func addProcessMetrics(metrics pmetric.MetricSlice, dataset string) error {
timestamp: timestamp,
intValue: &ioWriteOperations,
},
metric{
dataType: Gauge,
name: "system.process.cpu.total.pct",
timestamp: timestamp,
doubleValue: &cpuPct,
},
)

return nil
}

func addProcessAttributes(resource pcommon.Resource, dp pmetric.NumberDataPoint) {
process_ppid, _ := resource.Attributes().Get("process.parent_pid")
if process_ppid.Int() != 0 {
dp.Attributes().PutInt("process.parent.pid", process_ppid.Int())
}
process_owner, _ := resource.Attributes().Get("process.owner")
if process_owner.Str() != "" {
dp.Attributes().PutStr("user.name", process_owner.Str())
}
process_executable, _ := resource.Attributes().Get("process.executable.path")
if process_executable.Str() != "" {
dp.Attributes().PutStr("process.executable", process_executable.Str())
}
process_name, _ := resource.Attributes().Get("process.executable.name")
if process_name.Str() != "" {
dp.Attributes().PutStr("process.name", process_name.Str())
}
}
4 changes: 2 additions & 2 deletions processor/elasticprocessor/internal/hostmetrics/processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
)

func addProcessSummaryMetrics(metrics pmetric.MetricSlice, dataset string) error {
func addProcessSummaryMetrics(metrics pmetric.MetricSlice, resource pcommon.Resource, dataset string) error {
var timestamp pcommon.Timestamp
var idleProcesses, sleepingProcesses, stoppedProcesses, zombieProcesses, totalProcesses int64

Expand Down Expand Up @@ -42,7 +42,7 @@ func addProcessSummaryMetrics(metrics pmetric.MetricSlice, dataset string) error

}

addMetrics(metrics, dataset,
addMetrics(metrics, resource, dataset,
metric{
dataType: Sum,
name: "system.process.summary.idle",
Expand Down
3 changes: 2 additions & 1 deletion processor/elasticprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ func newProcessor(set processor.CreateSettings, cfg *Config) *ElasticProcessor {
func (p *ElasticProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
for i := 0; i < md.ResourceMetrics().Len(); i++ {
resourceMetric := md.ResourceMetrics().At(i)
rm := resourceMetric.Resource()

for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
scopeMetric := resourceMetric.ScopeMetrics().At(j)

if p.cfg.AddSystemMetrics {
if strings.HasPrefix(scopeMetric.Scope().Name(), "otelcol/hostmetricsreceiver") {
if err := hostmetrics.AddElasticSystemMetrics(scopeMetric, p.storage); err != nil {
if err := hostmetrics.AddElasticSystemMetrics(scopeMetric, rm, p.storage); err != nil {
p.logger.Error("error adding hostmetrics data", zap.Error(err))
}
}
Expand Down

0 comments on commit 80c19e9

Please sign in to comment.