Skip to content

Commit

Permalink
Add supporting features to enable distributed tracing
Browse files Browse the repository at this point in the history
This includes new internal pipeline policies and other supporting types.
See the changelog for a full description.
Added some missing doc comments.
  • Loading branch information
jhendrixMSFT committed Mar 3, 2023
1 parent 265c1a6 commit 1fe803d
Show file tree
Hide file tree
Showing 18 changed files with 657 additions and 31 deletions.
8 changes: 8 additions & 0 deletions sdk/azcore/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
## 1.5.0-beta.2 (Unreleased)

### Features Added
* Added supporting features to enable distributed tracing.
* Added func `runtime.StartSpan()` for use by SDKs to start spans.
* Added method `WithContext()` to `runtime.Request` to support shallow cloning with a new context.
* Added field `TracingNamespace` to `runtime.PipelineOptions`.
* Added field `Tracer` to `runtime.NewPollerOptions` and `runtime.NewPollerFromResumeTokenOptions` types.
* Added field `SpanFromContext` to `tracing.TracerOptions`.
* Added methods `Enabled()`, `SetAttributes()`, and `SpanFromContext()` to `tracing.Tracer`.
* Added supporting pipeline policies to include HTTP spans when creating clients.

### Breaking Changes

Expand Down
3 changes: 2 additions & 1 deletion sdk/azcore/arm/runtime/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
armpolicy "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/exported"
azpolicy "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
)
Expand All @@ -34,7 +35,7 @@ func NewPipeline(module, version string, cred azcore.TokenCredential, plOpts azr
})
perRetry := make([]azpolicy.Policy, len(plOpts.PerRetry), len(plOpts.PerRetry)+1)
copy(perRetry, plOpts.PerRetry)
plOpts.PerRetry = append(perRetry, authPolicy)
plOpts.PerRetry = append(perRetry, authPolicy, exported.PolicyFunc(httpTraceNamespacePolicy))
if !options.DisableRPRegistration {
regRPOpts := armpolicy.RegistrationOptions{ClientOptions: options.ClientOptions}
regPolicy, err := NewRPRegistrationPolicy(cred, &regRPOpts)
Expand Down
31 changes: 31 additions & 0 deletions sdk/azcore/arm/runtime/policy_trace_namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//go:build go1.18
// +build go1.18

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package runtime

import (
"net/http"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/internal/resource"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/shared"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing"
)

// httpTraceNamespacePolicy is a policy that adds the az.namespace attribute to the current Span
func httpTraceNamespacePolicy(req *policy.Request) (resp *http.Response, err error) {
rawTracer := req.Raw().Context().Value(shared.CtxWithTracingTracer{})
if tracer, ok := rawTracer.(tracing.Tracer); ok {
rt, err := resource.ParseResourceType(req.Raw().URL.Path)
if err == nil {
// add the namespace attribute to the current span
if span, ok := tracer.SpanFromContext(req.Raw().Context()); ok {
span.SetAttributes(tracing.Attribute{Key: "az.namespace", Value: rt.Namespace})
}
}
}
return req.Next()
}
97 changes: 97 additions & 0 deletions sdk/azcore/arm/runtime/policy_trace_namespace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//go:build go1.18
// +build go1.18

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package runtime

import (
"context"
"net/http"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/shared"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing"
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
"github.com/stretchr/testify/require"
)

func TestHTTPTraceNamespacePolicy(t *testing.T) {
srv, close := mock.NewServer()
defer close()

pl := exported.NewPipeline(srv, exported.PolicyFunc(httpTraceNamespacePolicy))

// no tracer
req, err := exported.NewRequest(context.Background(), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)

// wrong tracer type
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, 0), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)

// no SpanFromContext impl
tr := tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
return ctx, tracing.Span{}
}, nil)
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, tr), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)

// failed to parse resource ID, shouldn't call SetAttributes
var attrString string
tr = tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
return ctx, tracing.Span{}
}, &tracing.TracerOptions{
SpanFromContext: func(ctx context.Context) (tracing.Span, bool) {
spanImpl := tracing.SpanImpl{
SetAttributes: func(a ...tracing.Attribute) {
require.Len(t, a, 1)
v, ok := a[0].Value.(string)
require.True(t, ok)
attrString = a[0].Key + ":" + v
},
}
return tracing.NewSpan(spanImpl), true
},
})
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, tr), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)
require.Empty(t, attrString)

// success
tr = tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
return ctx, tracing.Span{}
}, &tracing.TracerOptions{
SpanFromContext: func(ctx context.Context) (tracing.Span, bool) {
spanImpl := tracing.SpanImpl{
SetAttributes: func(a ...tracing.Attribute) {
require.Len(t, a, 1)
v, ok := a[0].Value.(string)
require.True(t, ok)
attrString = a[0].Key + ":" + v
},
}
return tracing.NewSpan(spanImpl), true
},
})
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, tr), http.MethodGet, srv.URL()+requestEndpoint)
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)
require.EqualValues(t, "az.namespace:Microsoft.Storage", attrString)
}
3 changes: 3 additions & 0 deletions sdk/azcore/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func NewClient(clientName, moduleVersion string, plOpts runtime.PipelineOptions,
pl := runtime.NewPipeline(pkg, moduleVersion, plOpts, options)

tr := options.TracingProvider.NewTracer(clientName, moduleVersion)
if tr.Enabled() && plOpts.TracingNamespace != "" {
tr.SetAttributes(tracing.Attribute{Key: "az.namespace", Value: plOpts.TracingNamespace})
}
return &Client{pl: pl, tr: tr}, nil
}

Expand Down
40 changes: 40 additions & 0 deletions sdk/azcore/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@
package azcore

import (
"context"
"net/http"
"reflect"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/shared"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing"
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -131,3 +137,37 @@ func TestNewClientError(t *testing.T) {
require.Error(t, err)
require.Nil(t, client)
}

func TestNewClientTracingEnabled(t *testing.T) {
srv, close := mock.NewServer()
defer close()

var attrString string
client, err := NewClient("package.Client", "v1.0.0", runtime.PipelineOptions{TracingNamespace: "Widget.Factory"}, &policy.ClientOptions{
TracingProvider: tracing.NewProvider(func(name, version string) tracing.Tracer {
return tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
require.NotNil(t, options)
for _, attr := range options.Attributes {
if attr.Key == "az.namespace" {
v, ok := attr.Value.(string)
require.True(t, ok)
attrString = attr.Key + ":" + v
}
}
return ctx, tracing.Span{}
}, nil)
}, nil),
Transport: srv,
})
require.NoError(t, err)
require.NotNil(t, client)
require.NotZero(t, client.Pipeline())
require.NotZero(t, client.Tracer())

const requestEndpoint = "/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/fakeResourceGroupo/providers/Microsoft.Storage/storageAccounts/fakeAccountName"
req, err := exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, client.Tracer()), http.MethodGet, srv.URL()+requestEndpoint)
require.NoError(t, err)
srv.AppendResponse()
client.Pipeline().Do(req)
require.EqualValues(t, "az.namespace:Widget.Factory", attrString)
}
8 changes: 8 additions & 0 deletions sdk/azcore/internal/exported/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ func (req *Request) Clone(ctx context.Context) *Request {
return &r2
}

// WithContext returns a shallow copy of the request with its context changed to ctx.
func (req *Request) WithContext(ctx context.Context) *Request {
r2 := new(Request)
*r2 = *req
r2.req = r2.req.WithContext(ctx)
return r2
}

// not exported but dependent on Request

// PolicyFunc is a type that implements the Policy interface.
Expand Down
17 changes: 17 additions & 0 deletions sdk/azcore/internal/exported/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,20 @@ func TestNewRequestFail(t *testing.T) {
t.Fatal("unexpected request")
}
}

func TestRequestWithContext(t *testing.T) {
type ctxKey1 struct{}
type ctxKey2 struct{}

req1, err := NewRequest(context.WithValue(context.Background(), ctxKey1{}, 1), http.MethodPost, testURL)
require.NoError(t, err)
require.NotNil(t, req1.Raw().Context().Value(ctxKey1{}))

req2 := req1.WithContext(context.WithValue(context.Background(), ctxKey2{}, 1))
require.Nil(t, req2.Raw().Context().Value(ctxKey1{}))
require.NotNil(t, req2.Raw().Context().Value(ctxKey2{}))

// shallow copy, so changing req2 affects req1
req2.Raw().Header.Add("added-req2", "value")
require.EqualValues(t, "value", req1.Raw().Header.Get("added-req2"))
}
1 change: 1 addition & 0 deletions sdk/azcore/internal/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
HeaderUserAgent = "User-Agent"
HeaderWWWAuthenticate = "WWW-Authenticate"
HeaderXMSClientRequestID = "x-ms-client-request-id"
HeaderXMSRequestID = "x-ms-request-id"
)

const BearerTokenPrefix = "Bearer "
Expand Down
3 changes: 3 additions & 0 deletions sdk/azcore/internal/shared/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type CtxWithRetryOptionsKey struct{}
// CtxIncludeResponseKey is used as a context key for retrieving the raw response.
type CtxIncludeResponseKey struct{}

// CtxWithTracingTracer is used as a context key for adding/retrieving tracing.Tracer.
type CtxWithTracingTracer struct{}

// Delay waits for the duration to elapse or the context to be cancelled.
func Delay(ctx context.Context, delay time.Duration) error {
select {
Expand Down
3 changes: 2 additions & 1 deletion sdk/azcore/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type Request = exported.Request
// ClientOptions contains optional settings for a client's pipeline.
// All zero-value fields will be initialized with default values.
type ClientOptions struct {
// APIVersion overrides the default version requested of the service. Set with caution as this package version has not been tested with arbitrary service versions.
// APIVersion overrides the default version requested of the service.
// Set with caution as this package version has not been tested with arbitrary service versions.
APIVersion string

// Cloud specifies a cloud for the client. The default is Azure Public Cloud.
Expand Down
27 changes: 24 additions & 3 deletions sdk/azcore/runtime/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,29 @@ import (

// PipelineOptions contains Pipeline options for SDK developers
type PipelineOptions struct {
AllowedHeaders, AllowedQueryParameters []string
APIVersion APIVersionOptions
PerCall, PerRetry []policy.Policy
// AllowedHeaders is the slice of headers to log with their values intact.
// All headers not in the slice will have their values REDACTED.
// Applies to request and response headers.
AllowedHeaders []string

// AllowedQueryParameters is the slice of query parameters to log with their values intact.
// All query parameters not in the slice will have their values REDACTED.
AllowedQueryParameters []string

// APIVersion overrides the default version requested of the service.
// Set with caution as this package version has not been tested with arbitrary service versions.
APIVersion APIVersionOptions

// PerCall contains custom policies to inject into the pipeline.
// Each policy is executed once per request.
PerCall []policy.Policy

// PerRetry contains custom policies to inject into the pipeline.
// Each policy is executed once per request, and for each retry of that request.
PerRetry []policy.Policy

// TracingNamespace contains the value to use for the az.namespace span attribute.
TracingNamespace string
}

// Pipeline represents a primitive for sending HTTP requests and receiving responses.
Expand Down Expand Up @@ -58,6 +78,7 @@ func NewPipeline(module, version string, plOpts PipelineOptions, options *policy
policies = append(policies, cp.PerRetryPolicies...)
policies = append(policies, NewLogPolicy(&cp.Logging))
policies = append(policies, exported.PolicyFunc(httpHeaderPolicy), exported.PolicyFunc(bodyDownloadPolicy))
policies = append(policies, newHTTPTracePolicy(cp.Logging.AllowedQueryParams))
transport := cp.Transport
if transport == nil {
transport = defaultHTTPClient
Expand Down
Loading

0 comments on commit 1fe803d

Please sign in to comment.