Skip to content

Commit

Permalink
Rename svc.ID to svc.Attrs (#1393)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac authored Nov 23, 2024
1 parent 65111df commit fb6dbef
Show file tree
Hide file tree
Showing 49 changed files with 426 additions and 337 deletions.
4 changes: 2 additions & 2 deletions pkg/export/alloy/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type tracesReceiver struct {
}

func (tr *tracesReceiver) spanDiscarded(span *request.Span) bool {
return span.IgnoreTraces() || span.ServiceID.ExportsOTelTraces()
return span.IgnoreTraces() || span.Service.ExportsOTelTraces()
}

func (tr *tracesReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error) {
Expand All @@ -51,7 +51,7 @@ func (tr *tracesReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error)
if tr.spanDiscarded(span) {
continue
}
envResourceAttrs := otel.ResourceAttrsFromEnv(&span.ServiceID)
envResourceAttrs := otel.ResourceAttrsFromEnv(&span.Service)

for _, tc := range tr.cfg.Traces {
traces := otel.GenerateTraces(span, tr.hostID, traceAttrs, envResourceAttrs)
Expand Down
12 changes: 6 additions & 6 deletions pkg/export/alloy/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
)

func TestTracesSkipsInstrumented(t *testing.T) {
svcNoExport := svc.ID{}
svcNoExport := svc.Attrs{}

svcNoExportTraces := svc.ID{}
svcNoExportTraces := svc.Attrs{}
svcNoExportTraces.SetExportsOTelMetrics()

svcExportTraces := svc.ID{}
svcExportTraces := svc.Attrs{}
svcExportTraces.SetExportsOTelTraces()

tests := []struct {
Expand All @@ -31,17 +31,17 @@ func TestTracesSkipsInstrumented(t *testing.T) {
}{
{
name: "Foo span is not filtered",
spans: []request.Span{{ServiceID: svcNoExport, Type: request.EventTypeHTTPClient, Method: "GET", Route: "/foo", RequestStart: 100, End: 200}},
spans: []request.Span{{Service: svcNoExport, Type: request.EventTypeHTTPClient, Method: "GET", Route: "/foo", RequestStart: 100, End: 200}},
filtered: false,
},
{
name: "/v1/metrics span is not filtered",
spans: []request.Span{{ServiceID: svcNoExportTraces, Type: request.EventTypeHTTPClient, Method: "GET", Route: "/v1/metrics", RequestStart: 100, End: 200}},
spans: []request.Span{{Service: svcNoExportTraces, Type: request.EventTypeHTTPClient, Method: "GET", Route: "/v1/metrics", RequestStart: 100, End: 200}},
filtered: false,
},
{
name: "/v1/traces span is filtered",
spans: []request.Span{{ServiceID: svcExportTraces, Type: request.EventTypeHTTPClient, Method: "GET", Route: "/v1/traces", RequestStart: 100, End: 200}},
spans: []request.Span{{Service: svcExportTraces, Type: request.EventTypeHTTPClient, Method: "GET", Route: "/v1/traces", RequestStart: 100, End: 200}},
filtered: true,
},
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/export/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func textPrinter(input <-chan []request.Span) {
hn := ""

if spans[i].IsClientSpan() {
if spans[i].ServiceID.UID.Namespace != "" {
pn = "." + spans[i].ServiceID.UID.Namespace
if spans[i].Service.UID.Namespace != "" {
pn = "." + spans[i].Service.UID.Namespace
}
if spans[i].OtherNamespace != "" {
hn = "." + spans[i].OtherNamespace
Expand All @@ -93,8 +93,8 @@ func textPrinter(input <-chan []request.Span) {
if spans[i].OtherNamespace != "" {
pn = "." + spans[i].OtherNamespace
}
if spans[i].ServiceID.UID.Namespace != "" {
hn = "." + spans[i].ServiceID.UID.Namespace
if spans[i].Service.UID.Namespace != "" {
hn = "." + spans[i].Service.UID.Namespace
}
}

Expand All @@ -111,8 +111,8 @@ func textPrinter(input <-chan []request.Span) {
spans[i].Host+" as "+request.SpanHost(&spans[i])+hn,
spans[i].HostPort,
spans[i].ContentLength,
&spans[i].ServiceID,
spans[i].ServiceID.SDKLanguage.String(),
&spans[i].Service,
spans[i].Service.SDKLanguage.String(),
traceparent(&spans[i]),
)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/export/debug/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestTracePrinterValidEnabled(t *testing.T) {

func traceFuncHelper(t *testing.T, tracePrinter TracePrinter) string {
fakeSpan := request.Span{
ServiceID: svc.ID{UID: svc.UID{Name: "bar", Namespace: "foo"}, SDKLanguage: svc.InstrumentableGolang},
Service: svc.Attrs{UID: svc.UID{Name: "bar", Namespace: "foo"}, SDKLanguage: svc.InstrumentableGolang},
Type: request.EventTypeHTTP,
Method: "method",
Path: "path",
Expand Down
8 changes: 4 additions & 4 deletions pkg/export/otel/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ var DefaultBuckets = Buckets{
RequestSizeHistogram: []float64{0, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192},
}

func getAppResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue {
func getAppResourceAttrs(hostID string, service *svc.Attrs) []attribute.KeyValue {
return append(getResourceAttrs(hostID, service),
semconv.ServiceInstanceID(service.UID.Instance),
)
}

func getResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue {
func getResourceAttrs(hostID string, service *svc.Attrs) []attribute.KeyValue {
attrs := []attribute.KeyValue{
semconv.ServiceName(service.UID.Name),
// SpanMetrics requires an extra attribute besides service name
Expand Down Expand Up @@ -352,7 +352,7 @@ func headersFromEnv(varName string) map[string]string {
// OTEL_RESOURCE_ATTRIBUTES, i.e. a comma-separated list of
// key=values. For example: api-key=key,other-config-value=value
// The values are passed as parameters to the handler function
func parseOTELEnvVar(svc *svc.ID, varName string, handler attributes.VarHandler) {
func parseOTELEnvVar(svc *svc.Attrs, varName string, handler attributes.VarHandler) {
var envVar string
ok := false

Expand All @@ -371,7 +371,7 @@ func parseOTELEnvVar(svc *svc.ID, varName string, handler attributes.VarHandler)
attributes.ParseOTELResourceVariable(envVar, handler)
}

func ResourceAttrsFromEnv(svc *svc.ID) []attribute.KeyValue {
func ResourceAttrsFromEnv(svc *svc.Attrs) []attribute.KeyValue {
var otelResourceAttrs []attribute.KeyValue
apply := func(k string, v string) {
otelResourceAttrs = append(otelResourceAttrs, attribute.String(k, v))
Expand Down
2 changes: 1 addition & 1 deletion pkg/export/otel/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestParseOTELEnvVarPerService(t *testing.T) {
actual[k] = v
}

parseOTELEnvVar(&svc.ID{EnvVars: map[string]string{dummyVar: tc.envVar}}, dummyVar, apply)
parseOTELEnvVar(&svc.Attrs{EnvVars: map[string]string{dummyVar: tc.envVar}}, dummyVar, apply)

assert.True(t, reflect.DeepEqual(actual, tc.expected))
})
Expand Down
28 changes: 14 additions & 14 deletions pkg/export/otel/expirer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func TestNetMetricsExpiration(t *testing.T) {
}

// the expiration logic is held at two levels:
// (1) by group of attributes within the same service ID,
// (2) by metric set of a given service ID
// (1) by group of attributes within the same service Attrs,
// (2) by metric set of a given service Attrs
// this test verifies case 1
func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) {
defer restoreEnvAfterExecution()()
Expand Down Expand Up @@ -165,8 +165,8 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) {

// WHEN it receives metrics
metrics <- []request.Span{
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175},
{Service: svc.Attrs{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
{Service: svc.Attrs{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175},
}

// THEN the metrics are exported
Expand All @@ -189,7 +189,7 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) {
// AND WHEN it keeps receiving a subset of the initial metrics during the TTL
now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280},
{Service: svc.Attrs{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280},
}

// THEN THE metrics that have been received during the TTL period are still visible
Expand All @@ -203,7 +203,7 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) {

now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310},
{Service: svc.Attrs{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310},
}

// makes sure that the records channel is emptied and any remaining
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) {
// AND WHEN the metrics labels that disappeared are received again
now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520},
{Service: svc.Attrs{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520},
}

// THEN they are reported again, starting from zero in the case of counters
Expand All @@ -245,8 +245,8 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) {
}

// the expiration logic is held at two levels:
// (1) by group of attributes within the same service ID,
// (2) by metric set of a given service ID
// (1) by group of attributes within the same service Attrs,
// (2) by metric set of a given service Attrs
// this test verifies case 2
func TestAppMetricsExpiration_BySvcID(t *testing.T) {
defer restoreEnvAfterExecution()()
Expand Down Expand Up @@ -284,8 +284,8 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) {

// WHEN it receives metrics
metrics <- []request.Span{
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
{ServiceID: svc.ID{UID: svc.UID{Instance: "bar"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175},
{Service: svc.Attrs{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
{Service: svc.Attrs{UID: svc.UID{Instance: "bar"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175},
}

// THEN the metrics are exported
Expand All @@ -308,7 +308,7 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) {
// AND WHEN it keeps receiving a subset of the initial metrics during the TTL
now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280},
{Service: svc.Attrs{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280},
}

// THEN THE metrics that have been received during the TTL period are still visible
Expand All @@ -322,7 +322,7 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) {

now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310},
{Service: svc.Attrs{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310},
}

// BUT not the metrics that haven't been received during that time.
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) {
// AND WHEN the metrics labels that disappeared are received again
now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: svc.UID{Instance: "bar"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520},
{Service: svc.Attrs{UID: svc.UID{Instance: "bar"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520},
}

// THEN they are reported again, starting from zero in the case of counters
Expand Down
16 changes: 8 additions & 8 deletions pkg/export/otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ type MetricsReporter struct {
hostID string
attributes *attributes.AttrSelector
exporter metric.Exporter
reporters ReporterPool[*svc.ID, *Metrics]
reporters ReporterPool[*svc.Attrs, *Metrics]
is instrumentations.InstrumentationSelection

// user-selected fields for each of the reported metrics
Expand All @@ -190,7 +190,7 @@ type MetricsReporter struct {
// There is a Metrics instance for each service/process instrumented by Beyla.
type Metrics struct {
ctx context.Context
service *svc.ID
service *svc.Attrs
provider *metric.MeterProvider

// IMPORTANT! Don't forget to clean each Expirer in cleanupAllMetricsInstances method
Expand Down Expand Up @@ -286,7 +286,7 @@ func newMetricsReporter(
request.SpanOTELGetters, mr.attributes.For(attributes.MessagingProcessDuration))
}

mr.reporters = NewReporterPool[*svc.ID, *Metrics](cfg.ReportersCacheLen, cfg.TTL, timeNow,
mr.reporters = NewReporterPool[*svc.Attrs, *Metrics](cfg.ReportersCacheLen, cfg.TTL, timeNow,
func(id svc.UID, v *expirable[*Metrics]) {
if mr.cfg.SpanMetricsEnabled() {
attrOpt := instrument.WithAttributeSet(mr.metricResourceAttributes(v.value.service))
Expand Down Expand Up @@ -542,7 +542,7 @@ func (mr *MetricsReporter) setupGraphMeters(m *Metrics, meter instrument.Meter)
return nil
}

func (mr *MetricsReporter) newMetricSet(service *svc.ID) (*Metrics, error) {
func (mr *MetricsReporter) newMetricSet(service *svc.Attrs) (*Metrics, error) {
mlog := mlog().With("service", service)
mlog.Debug("creating new Metrics reporter")
resourceAttributes := append(getAppResourceAttrs(mr.hostID, service), ResourceAttrsFromEnv(service)...)
Expand Down Expand Up @@ -707,7 +707,7 @@ func otelHistogramConfig(metricName string, buckets []float64, useExponentialHis

}

func (mr *MetricsReporter) metricResourceAttributes(service *svc.ID) attribute.Set {
func (mr *MetricsReporter) metricResourceAttributes(service *svc.Attrs) attribute.Set {
attrs := []attribute.KeyValue{
request.ServiceMetric(service.UID.Name),
semconv.ServiceInstanceID(service.UID.Instance),
Expand Down Expand Up @@ -759,7 +759,7 @@ func (mr *MetricsReporter) serviceGraphAttributes() []attributes.Field[*request.
}

func otelSpanAccepted(span *request.Span, mr *MetricsReporter) bool {
return mr.cfg.OTelMetricsEnabled() && !span.ServiceID.ExportsOTelMetrics()
return mr.cfg.OTelMetricsEnabled() && !span.Service.ExportsOTelMetrics()
}

// nolint:cyclop
Expand Down Expand Up @@ -855,10 +855,10 @@ func (mr *MetricsReporter) reportMetrics(input <-chan []request.Span) {
if s.IgnoreMetrics() {
continue
}
reporter, err := mr.reporters.For(&s.ServiceID)
reporter, err := mr.reporters.For(&s.Service)
if err != nil {
mlog().Error("unexpected error creating OTEL resource. Ignoring metric",
"error", err, "service", s.ServiceID)
"error", err, "service", s.Service)
continue
}
reporter.record(s, mr)
Expand Down
8 changes: 4 additions & 4 deletions pkg/export/otel/metrics_proc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func TestProcMetrics_Aggregated(t *testing.T) {

// WHEN it receives process metrics
metrics <- []*process.Status{
{ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: svc.UID{Instance: "foo"}},
{ID: process.ID{Command: "foo", Service: &svc.Attrs{}, UID: svc.UID{Instance: "foo"}},
CPUUtilisationWait: 3, CPUUtilisationSystem: 2, CPUUtilisationUser: 1,
CPUTimeUserDelta: 30, CPUTimeWaitDelta: 20, CPUTimeSystemDelta: 10,
IOReadBytesDelta: 123, IOWriteBytesDelta: 456,
NetRcvBytesDelta: 11, NetTxBytesDelta: 22,
},
{ID: process.ID{Command: "bar", Service: &svc.ID{}, UID: svc.UID{Instance: "bar"}},
{ID: process.ID{Command: "bar", Service: &svc.Attrs{}, UID: svc.UID{Instance: "bar"}},
CPUUtilisationWait: 31, CPUUtilisationSystem: 21, CPUUtilisationUser: 11,
CPUTimeUserDelta: 301, CPUTimeWaitDelta: 201, CPUTimeSystemDelta: 101,
IOReadBytesDelta: 321, IOWriteBytesDelta: 654,
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestProcMetrics_Aggregated(t *testing.T) {

// AND WHEN new metrics are received
metrics <- []*process.Status{
{ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: svc.UID{Instance: "foo"}},
{ID: process.ID{Command: "foo", Service: &svc.Attrs{}, UID: svc.UID{Instance: "foo"}},
CPUUtilisationWait: 4, CPUUtilisationSystem: 1, CPUUtilisationUser: 2,
CPUTimeUserDelta: 3, CPUTimeWaitDelta: 2, CPUTimeSystemDelta: 1,
IOReadBytesDelta: 1, IOWriteBytesDelta: 2,
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestProcMetrics_Disaggregated(t *testing.T) {

// WHEN it receives process metrics
metrics <- []*process.Status{
{ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: svc.UID{Instance: "foo"}},
{ID: process.ID{Command: "foo", Service: &svc.Attrs{}, UID: svc.UID{Instance: "foo"}},
CPUUtilisationWait: 3, CPUUtilisationSystem: 2, CPUUtilisationUser: 1,
CPUTimeUserDelta: 30, CPUTimeWaitDelta: 20, CPUTimeSystemDelta: 10,
IOReadBytesDelta: 123, IOWriteBytesDelta: 456,
Expand Down
Loading

0 comments on commit fb6dbef

Please sign in to comment.