Skip to content

Commit

Permalink
runtimes/go: support trace sampling (encoredev#1649)
Browse files Browse the repository at this point in the history
  • Loading branch information
eandre authored Dec 12, 2024
1 parent 2b0336c commit 7c8ad25
Show file tree
Hide file tree
Showing 18 changed files with 316 additions and 205 deletions.
11 changes: 10 additions & 1 deletion cli/daemon/run/runtime_config2.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"fmt"
"net"
"net/netip"
"os"
"slices"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -122,10 +124,17 @@ func (g *RuntimeConfigGenerator) initialize() error {
})

if traceEndpoint, ok := g.TraceEndpoint.Get(); ok {
sampleRate := 1.0
if val, err := strconv.ParseFloat(os.Getenv("ENCORE_TRACE_SAMPLING_RATE"), 64); err == nil {
sampleRate = min(max(val, 0), 1)
}
g.conf.TracingProvider(&runtimev1.TracingProvider{
Rid: newRid(),
Provider: &runtimev1.TracingProvider_Encore{
Encore: &runtimev1.TracingProvider_EncoreTracingProvider{TraceEndpoint: traceEndpoint},
Encore: &runtimev1.TracingProvider_EncoreTracingProvider{
TraceEndpoint: traceEndpoint,
SamplingRate: &sampleRate,
},
},
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/rtconfgen/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (c *legacyConverter) Convert() (*config.Runtime, error) {
for _, prov := range obs.Tracing {
if enc := prov.GetEncore(); enc != nil {
cfg.TraceEndpoint = enc.TraceEndpoint
cfg.TraceSamplingRate = enc.SamplingRate
break
}
}
Expand Down
315 changes: 165 additions & 150 deletions proto/encore/runtime/v1/runtime.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions proto/encore/runtime/v1/runtime.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ message TracingProvider {

message EncoreTracingProvider {
string trace_endpoint = 1;
// The sampling rate to use for traces, between [0, 1].
// If unset it defaults to 1 (meaning all requests are traced).
optional double sampling_rate = 2;
}
}

Expand Down
11 changes: 6 additions & 5 deletions runtimes/go/appruntime/apisdk/api/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ func (d *AuthHandlerDesc[Params]) Authenticate(c IncomingContext) (model.AuthInf
go func() {
defer close(done)
_, authErr = c.server.beginRequest(c.req.Context(), &beginRequestParams{
TraceID: c.callMeta.TraceID,
ParentSpanID: c.callMeta.ParentSpanID,
SpanID: call.SpanID,
DefLoc: d.DefLoc,
Type: model.AuthHandler,
TraceID: c.callMeta.TraceID,
ParentSpanID: c.callMeta.ParentSpanID,
ParentSampled: c.callMeta.TraceSampled,
SpanID: call.SpanID,
DefLoc: d.DefLoc,
Type: model.AuthHandler,
Data: &model.RPCData{
Desc: d.rpcDesc(),
NonRawPayload: d.marshalParams(c.server.json, param),
Expand Down
28 changes: 21 additions & 7 deletions runtimes/go/appruntime/apisdk/api/call_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type CallMeta struct {
ParentSpanID model.SpanID // The span ID of the calling request (zero if there's no parent)
ParentEventID model.TraceEventID // The event ID which started the RPC call (zero if there's no parent)
CorrelationID string // The correlation ID of the calling request
TraceSampled bool // Whether the caller sampled trace info

// Internal meta data which gets populated by Encore on service to service calls
//
Expand Down Expand Up @@ -85,6 +86,7 @@ func (s *Server) metaFromAPICall(call *model.APICall) (meta CallMeta, err error)
meta.ParentSpanID = call.Source.SpanID
meta.ParentEventID = call.StartEventID
meta.CorrelationID = call.Source.ExtCorrelationID
meta.TraceSampled = call.Source.Traced

if call.Source.RPCData != nil && call.Source.RPCData.Desc != nil {
// If we're processing an API call, let's update the caller
Expand Down Expand Up @@ -120,7 +122,11 @@ func (meta CallMeta) AddToRequest(server *Server, targetService config.Service,
// If we're tracing, pass the trace ID, span ID and event ID to the downstream service
if !meta.TraceID.IsZero() {
// Encode Encore's trace ID and span ID as the traceparent header
req.SetMeta(transport.TraceParentKey, fmt.Sprintf("00-%x-%x-01", meta.TraceID[:], meta.ParentSpanID[:]))
sampled := "00"
if meta.TraceSampled {
sampled = "01"
}
req.SetMeta(transport.TraceParentKey, fmt.Sprintf("00-%x-%x-%s", meta.TraceID[:], meta.ParentSpanID[:], sampled))

if !meta.ParentSpanID.IsZero() {
// Because Encore does not count an RPC call as a span, but rather a set of events within a span
Expand Down Expand Up @@ -233,7 +239,7 @@ func (s *Server) MetaFromRequest(req transport.Transport) (meta CallMeta, err er
// to interopt with other tracing systems.
meta.Internal != nil {

meta.TraceID, meta.ParentSpanID, _ = parseTraceParent(traceParent)
meta.TraceID, meta.ParentSpanID, meta.TraceSampled, _ = parseTraceParent(traceParent)

// If the caller is a gateway, ignore the parent span id as gateways don't currently record a span.
// If we include it the root request won't be tagged as such.
Expand Down Expand Up @@ -271,7 +277,7 @@ func (s *Server) MetaFromRequest(req transport.Transport) (meta CallMeta, err er
// parseTraceParent parses the trace and span ids from s, which is assumed
// to be in the format of the traceparent header (see https://www.w3.org/TR/trace-context/).
// If it's not a valid traceparent header it returns zero ids and ok == false.
func parseTraceParent(s string) (traceID model.TraceID, spanID model.SpanID, ok bool) {
func parseTraceParent(s string) (traceID model.TraceID, spanID model.SpanID, sampled, ok bool) {
const (
version = "00"
traceIDLen = 32
Expand All @@ -293,20 +299,28 @@ func parseTraceParent(s string) (traceID model.TraceID, spanID model.SpanID, ok
)

if len(s) != totalLen || s[verStart:verEnd] != version || s[verSep] != '-' || s[traceIDSep] != '-' || s[spanIDSep] != '-' {
return model.TraceID{}, model.SpanID{}, false
return model.TraceID{}, model.SpanID{}, false, false
}

_, err := hex.Decode(traceID[:], []byte(s[traceIDStart:traceIDEnd]))
if err != nil {
return model.TraceID{}, model.SpanID{}, false
return model.TraceID{}, model.SpanID{}, false, false
}

_, err = hex.Decode(spanID[:], []byte(s[spanIDStart:spanIDEnd]))
if err != nil {
return model.TraceID{}, model.SpanID{}, false
return model.TraceID{}, model.SpanID{}, false, false
}

return traceID, spanID, true
var flags [2]byte
_, err = hex.Decode(flags[:], []byte(s[flagsStart:flagsEnd]))
if err != nil {
return model.TraceID{}, model.SpanID{}, false, false
}

sampled = flags[1]&1 == 1

return traceID, spanID, sampled, true
}

// parseTraceState parses the trace event id from the tracestate header (see https://www.w3.org/TR/trace-context/).
Expand Down
12 changes: 11 additions & 1 deletion runtimes/go/appruntime/apisdk/api/reqtrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type beginRequestParams struct {
// It is copied from the parent request if it is empty.
ParentSpanID model.SpanID

// ParentSampled indicates whether the parent span sampled trace information.
ParentSampled bool

// CallerEventID is the event ID in the parent span that triggered this request.
// It's used to correlate the request with the originating call.
CallerEventID model.TraceEventID
Expand Down Expand Up @@ -81,6 +84,13 @@ func (s *Server) beginRequest(ctx context.Context, p *beginRequestParams) (*mode
spanID = id
}

var traced bool
if p.ParentSpanID.IsZero() {
traced = s.rt.SampleTrace()
} else {
traced = p.ParentSampled
}

req := &model.Request{
Type: p.Type,
TraceID: traceID,
Expand All @@ -92,7 +102,7 @@ func (s *Server) beginRequest(ctx context.Context, p *beginRequestParams) (*mode
DefLoc: p.DefLoc,
SvcNum: p.Data.Desc.SvcNum,
Start: s.clock.Now(),
Traced: s.tracingEnabled,
Traced: traced,
RPCData: p.Data,
}

Expand Down
27 changes: 14 additions & 13 deletions runtimes/go/appruntime/exported/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,20 @@ type Static struct {
}

type Runtime struct {
AppID string `json:"app_id"`
AppSlug string `json:"app_slug"`
APIBaseURL string `json:"api_base_url"`
EnvID string `json:"env_id"`
EnvName string `json:"env_name"`
EnvType string `json:"env_type"`
EnvCloud string `json:"env_cloud"`
DeployID string `json:"deploy_id"` // Overridden by ENCORE_DEPLOY_ID env var if set
DeployedAt time.Time `json:"deploy_time"`
TraceEndpoint string `json:"trace_endpoint,omitempty"`
AuthKeys []EncoreAuthKey `json:"auth_keys,omitempty"`
CORS *CORS `json:"cors,omitempty"`
EncoreCloudAPI *EncoreCloudAPI `json:"ec_api,omitempty"` // If nil, the app is not running in Encore Cloud
AppID string `json:"app_id"`
AppSlug string `json:"app_slug"`
APIBaseURL string `json:"api_base_url"`
EnvID string `json:"env_id"`
EnvName string `json:"env_name"`
EnvType string `json:"env_type"`
EnvCloud string `json:"env_cloud"`
DeployID string `json:"deploy_id"` // Overridden by ENCORE_DEPLOY_ID env var if set
DeployedAt time.Time `json:"deploy_time"`
TraceEndpoint string `json:"trace_endpoint,omitempty"`
TraceSamplingRate *float64 `json:"trace_sampling_rate,omitempty"`
AuthKeys []EncoreAuthKey `json:"auth_keys,omitempty"`
CORS *CORS `json:"cors,omitempty"`
EncoreCloudAPI *EncoreCloudAPI `json:"ec_api,omitempty"` // If nil, the app is not running in Encore Cloud

SQLDatabases []*SQLDatabase `json:"sql_databases,omitempty"`
SQLServers []*SQLServer `json:"sql_servers,omitempty"`
Expand Down
29 changes: 21 additions & 8 deletions runtimes/go/appruntime/shared/reqtrack/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type encoreOp struct {
start int64 // start time of trace from nanotime()

// trace is the trace log; it is nil if the op is not traced.
trace *lazyTraceInit
trace atomic.Pointer[lazyTraceInit]

// refs is the op refcount. It is 1 + number of requests
// that reference this op (see doc comment above).
Expand All @@ -48,6 +48,10 @@ type encoreOp struct {
goidCtr uint32
}

func (op *encoreOp) beginTracing() {
op.trace.CompareAndSwap(nil, newLazyTrace(op.t))
}

// encoreReq represents an Encore API request.
type encoreReq struct {
// spanID is the request span id.
Expand Down Expand Up @@ -75,7 +79,7 @@ func (t *RequestTracker) newOp(trace bool) *encoreOp {
refs: 1,
}
if trace && t.trace != nil {
op.trace = newLazyTrace(t)
op.beginTracing()
}
return op
}
Expand Down Expand Up @@ -119,11 +123,13 @@ func (op *encoreOp) incRef() int32 {
// If it reaches zero and the op is traced, it sends off the trace.
func (op *encoreOp) decRef(blockOnTraceSend bool) int32 {
n := atomic.AddInt32(&op.refs, -1)
if n == 0 && op.trace != nil {
op.trace.MarkDone()
if n == 0 {
if trace := op.trace.Load(); trace != nil {
trace.MarkDone()

if blockOnTraceSend {
op.trace.WaitForStreamSent()
if blockOnTraceSend {
trace.WaitForStreamSent()
}
}
}
return n
Expand All @@ -147,6 +153,11 @@ func (t *RequestTracker) beginReq(data *model2.Request, trace bool) {
}
e.op.incRef()
e.req = req

// Begin tracing if we haven't, already.
if trace && e.op.trace.Load() == nil {
e.op.beginTracing()
}
}
}

Expand All @@ -167,8 +178,10 @@ func (t *RequestTracker) finishReq(blockOnTraceSend bool) {
func (t *RequestTracker) currentReq() (req *model2.Request, tr trace2.Logger, goctr uint32, svcNum uint16) {
if g := t.impl.get(); g != nil {
var tr trace2.Logger
if g.op != nil && g.op.trace != nil {
tr = g.op.trace.Logger()
if g.op != nil {
if trace := g.op.trace.Load(); trace != nil {
tr = trace.Logger()
}
}
if g.req != nil {
req = g.req.data
Expand Down
12 changes: 9 additions & 3 deletions runtimes/go/appruntime/shared/reqtrack/impl_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,18 @@ func beginHTTPRoundTrip(req *http.Request) (context.Context, error) {
return nil, fmt.Errorf("http: nil Request.URL")
}

return g.op.trace.Logger().HTTPBeginRoundTrip(req, g.req.data, g.goctr)
if trace := g.op.trace.Load(); trace != nil {
return trace.Logger().HTTPBeginRoundTrip(req, g.req.data, g.goctr)
}

return req.Context(), nil
}

//go:linkname finishHTTPRoundTrip net/http.encoreFinishRoundTrip
func finishHTTPRoundTrip(req *http.Request, resp *http.Response, err error) {
if g := getEncoreG(); g != nil && g.req != nil && g.op.trace != nil {
g.op.trace.Logger().HTTPCompleteRoundTrip(req, resp, g.goctr, err)
if g := getEncoreG(); g != nil && g.req != nil {
if trace := g.op.trace.Load(); trace != nil {
trace.Logger().HTTPCompleteRoundTrip(req, resp, g.goctr, err)
}
}
}
10 changes: 6 additions & 4 deletions runtimes/go/appruntime/shared/reqtrack/reqtrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type RequestTracker struct {
}

func (t *RequestTracker) BeginOperation() {
t.beginOp(true /* always trace by default */)
t.beginOp(false)
}

func (t *RequestTracker) FinishOperation() {
Expand Down Expand Up @@ -79,12 +79,10 @@ func copyReqInfoFromParent(next, prev *model.Request) {
if next.ExtCorrelationID == "" {
next.ExtCorrelationID = prev.ExtCorrelationID
}
if !next.Traced {
next.Traced = prev.Traced
}
if next.Test == nil {
next.Test = prev.Test
}
next.Traced = prev.Traced
}

func (t *RequestTracker) FinishRequest(blockOnTraceSend bool) {
Expand Down Expand Up @@ -113,3 +111,7 @@ func (t *RequestTracker) Logger() *zerolog.Logger {
func (t *RequestTracker) TracingEnabled() bool {
return t.trace != nil
}

func (t *RequestTracker) SampleTrace() bool {
return t.trace != nil && t.trace.SampleTrace()
}
4 changes: 3 additions & 1 deletion runtimes/go/appruntime/shared/reqtrack/singleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ func init() {
var traceFactory traceprovider.Factory
tracingEnabled := appconf.Runtime.TraceEndpoint != "" && len(appconf.Runtime.AuthKeys) > 0
if tracingEnabled {
traceFactory = &traceprovider.DefaultFactory{}
traceFactory = &traceprovider.DefaultFactory{
SampleRate: appconf.Runtime.TraceSamplingRate,
}
}

Singleton = New(logging.RootLogger, platform.Singleton, traceFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ type mockFactory struct {
func (f *mockFactory) NewLogger() trace2.Logger {
return f.log
}

func (f *mockFactory) SampleTrace() bool {
return true
}
22 changes: 20 additions & 2 deletions runtimes/go/appruntime/shared/traceprovider/traceprovider.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
package traceprovider

import (
"math/rand/v2"

"encore.dev/appruntime/exported/trace2"
)

type Factory interface {
NewLogger() trace2.Logger
SampleTrace() bool
}

type DefaultFactory struct{}
type DefaultFactory struct {
// SampleRate is the rate at which to sample traces, between [0, 1].
// If nil, 100% of traces are sampled.
SampleRate *float64
}

func (*DefaultFactory) NewLogger() trace2.Logger { return trace2.NewLog() }
func (f *DefaultFactory) NewLogger() trace2.Logger {
return trace2.NewLog()
}

func (f *DefaultFactory) SampleTrace() bool {
if f.SampleRate == nil {
return true
} else {
sample := rand.Float64() < *f.SampleRate
return sample
}
}
Loading

0 comments on commit 7c8ad25

Please sign in to comment.