Skip to content

stats: Add optional locality label in cluster_impl picker #7434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,12 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
var labels *istats.Labels
if labels = istats.GetLabels(ctx); labels == nil {
labels = &istats.Labels{
TelemetryLabels: make(map[string]string),
// The defaults for all the per call labels from a plugin that
// executes on the callpath that this OpenTelemetry component
// currently supports.
TelemetryLabels: map[string]string{
"grpc.lb.locality": "",
},
}
ctx = istats.SetLabels(ctx, labels)
}
Expand Down Expand Up @@ -232,6 +237,8 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo,
}

for _, o := range h.options.MetricsOptions.OptionalLabels {
// TODO: Add a filter for converting to unknown if not present in the
// CSM Plugin Option layer by adding an optional labels API.
if val, ok := ai.xdsLabels[o]; ok {
attributes = append(attributes, otelattribute.String(o, val))
}
Expand Down
4 changes: 4 additions & 0 deletions stats/opentelemetry/csm/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (o *perTargetDialOption) DialOptionForTarget(parsedTarget url.URL) grpc.Dia

func dialOptionWithCSMPluginOption(options opentelemetry.Options, po otelinternal.PluginOption) grpc.DialOption {
options.MetricsOptions.OptionalLabels = []string{"csm.service_name", "csm.service_namespace_name"} // Attach the two xDS Optional Labels for this component to not filter out.
return dialOptionSetCSM(options, po)
}

func dialOptionSetCSM(options opentelemetry.Options, po otelinternal.PluginOption) grpc.DialOption {
otelinternal.SetPluginOption.(func(options *opentelemetry.Options, po otelinternal.PluginOption))(&options, po)
return opentelemetry.DialOption(options)
}
Expand Down
16 changes: 11 additions & 5 deletions stats/opentelemetry/csm/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,12 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) {
func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = istats.SetLabels(ctx, &istats.Labels{
TelemetryLabels: map[string]string{
// mock what the cluster impl would write here ("csm." xDS Labels)
// mock what the cluster impl would write here ("csm." xDS Labels
// and locality label)
"csm.service_name": "service_name_val",
"csm.service_namespace_name": "service_namespace_val",

"grpc.lb.locality": "grpc.lb.locality_val",
},
})

Expand All @@ -441,8 +444,9 @@ func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, re
// Optional Labels turned on. It then configures an interceptor to attach
// labels, representing the cluster_impl picker. It then makes a unary RPC, and
// expects xDS Labels labels to be attached to emitted relevant metrics. Full
// xDS System alongside OpenTelemetry will be tested with interop. (there is
// a test for xDS -> Stats handler and this tests -> OTel -> emission).
// xDS System alongside OpenTelemetry will be tested with interop. (there is a
// test for xDS -> Stats handler and this tests -> OTel -> emission). It also
// tests the optional per call locality label in the same manner.
func (s) TestXDSLabels(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -457,11 +461,11 @@ func (s) TestXDSLabels(t *testing.T) {
}

po := newPluginOption(ctx)
dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{
dopts := []grpc.DialOption{dialOptionSetCSM(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics(),
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"},
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality"},
},
}, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)}
if err := ss.Start(nil, dopts...); err != nil {
Expand Down Expand Up @@ -489,6 +493,7 @@ func (s) TestXDSLabels(t *testing.T) {

serviceNameAttr := attribute.String("csm.service_name", "service_name_val")
serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val")
localityAttr := attribute.String("grpc.lb.locality", "grpc.lb.locality_val")
meshIDAttr := attribute.String("csm.mesh_id", "unknown")
workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown")
remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown")
Expand All @@ -500,6 +505,7 @@ func (s) TestXDSLabels(t *testing.T) {
unaryStatusAttr,
serviceNameAttr,
serviceNamespaceAttr,
localityAttr,
meshIDAttr,
workloadCanonicalServiceAttr,
remoteWorkloadTypeAttr,
Expand Down
19 changes: 12 additions & 7 deletions test/xds/xds_telemetry_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/stats"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/types/known/structpb"
)

Expand All @@ -43,6 +44,9 @@ const serviceNamespaceKeyCSM = "csm.service_namespace_name"
const serviceNameValue = "grpc-service"
const serviceNamespaceValue = "grpc-service-namespace"

const localityKey = "grpc.lb.locality"
const localityValue = `{"region":"region-1","zone":"zone-1","subZone":"subzone-1"}`

// TestTelemetryLabels tests that telemetry labels from CDS make their way to
// the stats handler. The stats handler sets the mutable context value that the
// cluster impl picker will write telemetry labels to, and then the stats
Expand Down Expand Up @@ -126,13 +130,14 @@ func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
// aren't started. All of these should have access to the desired telemetry
// labels.
case *stats.OutPayload, *stats.InPayload, *stats.End:
if label, ok := fsh.labels.TelemetryLabels[serviceNameKeyCSM]; !ok || label != serviceNameValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKeyCSM, serviceNameValue, label)
want := map[string]string{
serviceNameKeyCSM: serviceNameValue,
serviceNamespaceKeyCSM: serviceNamespaceValue,
localityKey: localityValue,
}
if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKeyCSM]; !ok || label != serviceNamespaceValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKeyCSM, serviceNamespaceValue, label)
if diff := cmp.Diff(fsh.labels.TelemetryLabels, want); diff != "" {
fsh.t.Fatalf("fsh.labels.TelemetryLabels (-got +want): %v", diff)
}

default:
// Nothing to assert for the other stats.Handler callouts.
}
Expand Down
25 changes: 20 additions & 5 deletions xds/internal/balancer/clusterimpl/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package clusterimpl

import (
"context"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -96,14 +98,23 @@ func (b *clusterImplBalancer) newPicker(config *dropConfigs) *picker {
}
}

func telemetryLabels(ctx context.Context) map[string]string {
if ctx == nil {
return nil
}
labels := stats.GetLabels(ctx)
if labels == nil {
return nil
}
return labels.TelemetryLabels
}

func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// Unconditionally set labels if present, even dropped or queued RPC's can
// use these labels.
if info.Ctx != nil {
if labels := stats.GetLabels(info.Ctx); labels != nil && labels.TelemetryLabels != nil {
for key, value := range d.telemetryLabels {
labels.TelemetryLabels[key] = value
}
if labels := telemetryLabels(info.Ctx); labels != nil {
for key, value := range d.telemetryLabels {
labels[key] = value
}
}

Expand Down Expand Up @@ -156,6 +167,10 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
return pr, err
}

if labels := telemetryLabels(info.Ctx); labels != nil {
labels["grpc.lb.locality"] = lIDStr
}

if d.loadStore != nil {
d.loadStore.CallStarted(lIDStr)
oldDone := pr.Done
Expand Down