Skip to content

Commit

Permalink
A72: add event for name resolution delay
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Jan 7, 2025
1 parent 6e06350 commit ed87b41
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 21 deletions.
29 changes: 23 additions & 6 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,12 +613,18 @@ type ClientConn struct {

// mu protects the following fields.
// TODO: split mu so the same mutex isn't used for everything.
mu sync.RWMutex
resolverWrapper *ccResolverWrapper // Always recreated whenever entering idle to simplify Close.
balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close.
sc *ServiceConfig // Latest service config received from the resolver.
conns map[*addrConn]struct{} // Set to nil on close.
mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
mu sync.RWMutex
// nameResolutionStartTime track the start time since name resolution started.
nameResolutionStartTime time.Time
// nameResolutionInProgress indicate if name resolution is in progress.
nameResolutionInProgress bool
// nameResolutionDelay holds the duration for which name resolution was delayed.
nameResolutionDelay time.Duration
resolverWrapper *ccResolverWrapper // Always recreated whenever entering idle to simplify Close.
balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close.
sc *ServiceConfig // Latest service config received from the resolver.
conns map[*addrConn]struct{} // Set to nil on close.
mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
// firstResolveEvent is used to track whether the name resolver sent us at
// least one update. RPCs block on this event. May be accessed without mu
// if we know we cannot be asked to enter idle mode while accessing it (e.g.
Expand Down Expand Up @@ -674,13 +680,24 @@ func (cc *ClientConn) Connect() {
// context expires. Returns nil unless the context expires first; otherwise
// returns a status error based on the context.
func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
// Set the start time for name resolution if it's not already set
cc.mu.Lock()
if !cc.nameResolutionInProgress {
cc.nameResolutionStartTime = time.Now()
cc.nameResolutionInProgress = true
}
cc.mu.Unlock()

// This is on the RPC path, so we use a fast path to avoid the
// more-expensive "select" below after the resolver has returned once.
if cc.firstResolveEvent.HasFired() {
return nil
}
select {
case <-cc.firstResolveEvent.Done():
cc.mu.Lock()
cc.nameResolutionDelay = time.Now().Sub(cc.nameResolutionStartTime)
cc.mu.Unlock()
return nil
case <-ctx.Done():
return status.FromContextError(ctx.Err()).Err()
Expand Down
3 changes: 3 additions & 0 deletions stats/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package stats
import (
"context"
"net"
"time"
)

// ConnTagInfo defines the relevant information needed by connection context tagger.
Expand All @@ -38,6 +39,8 @@ type RPCTagInfo struct {
// FailFast indicates if this RPC is failfast.
// This field is only valid on client side, it's always false on server side.
FailFast bool
// NameResolutionDelay indicates the time name resolution was delayed for.
NameResolutionDelay time.Duration
}

// Handler defines the interface for the related stats handling (e.g., RPCs, connections).
Expand Down
5 changes: 5 additions & 0 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
method: info.FullMethodName,
}
if h.options.isTracingEnabled() {
if info.NameResolutionDelay > 0 {
callSpan := trace.SpanFromContext(ctx)
callSpan.AddEvent("Delayed name resolution complete")
ctx = trace.ContextWithSpan(ctx, callSpan)
}
ctx, ai = h.traceTagRPC(ctx, info, ai)
}
return setRPCInfo(ctx, &rpcInfo{
Expand Down
36 changes: 21 additions & 15 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,21 +320,25 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
callHdr.Creds = c.creds
}

// Acquire mutex to access cc.nameResolutionDelay.
cc.mu.Lock()
cs := &clientStream{
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: c,
cc: cc,
desc: desc,
codec: c.codec,
cp: cp,
comp: comp,
cancel: cancel,
firstAttempt: true,
onCommit: onCommit,
}
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: c,
cc: cc,
desc: desc,
codec: c.codec,
cp: cp,
comp: comp,
cancel: cancel,
firstAttempt: true,
onCommit: onCommit,
nameResolutionDelay: cc.nameResolutionDelay,
}
cc.mu.Unlock()
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
}
Expand Down Expand Up @@ -417,7 +421,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
var beginTime time.Time
shs := cs.cc.dopts.copts.StatsHandlers
for _, sh := range shs {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast, NameResolutionDelay: cs.nameResolutionDelay})
beginTime = time.Now()
begin := &stats.Begin{
Client: true,
Expand Down Expand Up @@ -532,6 +536,8 @@ type clientStream struct {
cc *ClientConn
desc *StreamDesc

nameResolutionDelay time.Duration

codec baseCodec
cp Compressor
comp encoding.Compressor
Expand Down

0 comments on commit ed87b41

Please sign in to comment.