Skip to content

Commit

Permalink
Introduce delta xds log scope to distinguish ads logs (istio#37341)
Browse files Browse the repository at this point in the history
* delta xds log scope

* revert shouldUseDelta
  • Loading branch information
hzxuzhonghu authored Feb 17, 2022
1 parent 131c7c5 commit 34f63bc
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 27 deletions.
3 changes: 2 additions & 1 deletion pilot/pkg/networking/core/v1alpha3/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
xdstype "github.com/envoyproxy/go-control-plane/envoy/type/v3"
structpb "google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/structpb"
wrappers "google.golang.org/protobuf/types/known/wrapperspb"

meshconfig "istio.io/api/mesh/v1alpha1"
Expand Down Expand Up @@ -97,6 +97,7 @@ func (configgen *ConfigGeneratorImpl) BuildDeltaClusters(proxy *model.Proxy, upd
cl, lg := configgen.BuildClusters(proxy, updates)
return cl, nil, lg, false
}

deletedClusters := make([]string, 0)
services := make([]*model.Service, 0)
// In delta, we only care about the services that have changed.
Expand Down
55 changes: 29 additions & 26 deletions pilot/pkg/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ import (
"istio.io/istio/pilot/pkg/networking/util"
"istio.io/istio/pilot/pkg/util/sets"
v3 "istio.io/istio/pilot/pkg/xds/v3"
istiolog "istio.io/pkg/log"
)

var deltaLog = istiolog.RegisterScope("delta", "delta xds debugging", 0)

func (s *DiscoveryServer) StreamDeltas(stream DeltaDiscoveryStream) error {
if knativeEnv != "" && firstRequest.Load() {
// How scaling works in knative is the first request is the "loading" request. During
Expand Down Expand Up @@ -61,7 +64,7 @@ func (s *DiscoveryServer) StreamDeltas(stream DeltaDiscoveryStream) error {
}

if err := s.WaitForRequestLimit(stream.Context()); err != nil {
log.Warnf("ADS: %q exceeded rate limit: %v", peerAddr, err)
deltaLog.Warnf("ADS: %q exceeded rate limit: %v", peerAddr, err)
return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err)
}

Expand All @@ -70,16 +73,16 @@ func (s *DiscoveryServer) StreamDeltas(stream DeltaDiscoveryStream) error {
return status.Error(codes.Unauthenticated, err.Error())
}
if ids != nil {
log.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)
deltaLog.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)
} else {
log.Debugf("Unauthenticated XDS: %v", peerAddr)
deltaLog.Debugf("Unauthenticated XDS: %v", peerAddr)
}

// InitContext returns immediately if the context was already initialized.
if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil {
// Error accessing the data - log and close, maybe a different pilot replica
// has more luck
log.Warnf("Error reading config %v", err)
deltaLog.Warnf("Error reading config %v", err)
return status.Error(codes.Unavailable, "error reading config")
}
con := newDeltaConnection(peerAddr, stream)
Expand All @@ -104,7 +107,7 @@ func (s *DiscoveryServer) StreamDeltas(stream DeltaDiscoveryStream) error {
select {
case req, ok := <-con.deltaReqChan:
if ok {
log.Debugf("ADS: got Delta Request for: %s", req.TypeUrl)
deltaLog.Debugf("ADS: got Delta Request for: %s", req.TypeUrl)
if err := s.processDeltaRequest(req, con); err != nil {
return err
}
Expand Down Expand Up @@ -135,7 +138,7 @@ func (s *DiscoveryServer) pushConnectionDelta(con *Connection, pushEv *Event) er
}

if !s.ProxyNeedsPush(con.proxy, pushRequest) {
log.Debugf("Skipping push to %v, no updates required", con.ConID)
deltaLog.Debugf("Skipping push to %v, no updates required", con.ConID)
if pushRequest.Full {
// Only report for full versions, incremental pushes do not have a new version
reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.LedgerVersion, nil)
Expand All @@ -161,7 +164,7 @@ func (s *DiscoveryServer) pushConnectionDelta(con *Connection, pushEv *Event) er
// avoid any scenario where this may deadlock.
// This can possibly be removed in the future if we find this never causes issues
totalDelayedPushes.With(typeTag.Value(v3.GetMetricType(w.TypeUrl))).Increment()
log.Warnf("%s: QUEUE TIMEOUT for node:%s", v3.GetShortType(w.TypeUrl), con.proxy.ID)
deltaLog.Warnf("%s: QUEUE TIMEOUT for node:%s", v3.GetShortType(w.TypeUrl), con.proxy.ID)
}
if synced || timeout {
// Send the push now
Expand All @@ -174,7 +177,7 @@ func (s *DiscoveryServer) pushConnectionDelta(con *Connection, pushEv *Event) er
// https://github.com/istio/istio/issues/25685 for details on the performance
// impact of sending pushes before Envoy ACKs.
totalDelayedPushes.With(typeTag.Value(v3.GetMetricType(w.TypeUrl))).Increment()
log.Debugf("%s: QUEUE for node:%s", v3.GetShortType(w.TypeUrl), con.proxy.ID)
deltaLog.Debugf("%s: QUEUE for node:%s", v3.GetShortType(w.TypeUrl), con.proxy.ID)
con.proxy.Lock()
con.blockedPushes[w.TypeUrl] = con.blockedPushes[w.TypeUrl].CopyMerge(pushEv.pushRequest)
con.proxy.Unlock()
Expand Down Expand Up @@ -205,11 +208,11 @@ func (s *DiscoveryServer) receiveDelta(con *Connection, identities []string) {
req, err := con.deltaStream.Recv()
if err != nil {
if istiogrpc.IsExpectedGRPCError(err) {
log.Infof("ADS: %q %s terminated %v", con.PeerAddr, con.ConID, err)
deltaLog.Infof("ADS: %q %s terminated %v", con.PeerAddr, con.ConID, err)
return
}
con.errorChan <- err
log.Errorf("ADS: %q %s terminated with error: %v", con.PeerAddr, con.ConID, err)
deltaLog.Errorf("ADS: %q %s terminated with error: %v", con.PeerAddr, con.ConID, err)
totalXDSInternalErrors.Increment()
return
}
Expand All @@ -225,13 +228,13 @@ func (s *DiscoveryServer) receiveDelta(con *Connection, identities []string) {
return
}
defer s.closeConnection(con)
log.Infof("ADS: new delta connection for node:%s", con.ConID)
deltaLog.Infof("ADS: new delta connection for node:%s", con.ConID)
}

select {
case con.deltaReqChan <- req:
case <-con.deltaStream.Context().Done():
log.Infof("ADS: %q %s terminated with stream closed", con.PeerAddr, con.ConID)
deltaLog.Infof("ADS: %q %s terminated with stream closed", con.PeerAddr, con.ConID)
return
}
}
Expand Down Expand Up @@ -260,7 +263,7 @@ func (conn *Connection) sendDelta(res *discovery.DeltaDiscoveryResponse) error {
conn.proxy.Unlock()
}
} else {
log.Infof("Timeout writing %s", conn.ConID)
deltaLog.Infof("Timeout writing %s", conn.ConID)
xdsResponseWriteTimeouts.Increment()
}
return err
Expand Down Expand Up @@ -298,7 +301,7 @@ func (s *DiscoveryServer) processDeltaRequest(req *discovery.DeltaDiscoveryReque
con.proxy.Unlock()
if haveBlockedPush {
// we have a blocked push which we will use
log.Debugf("%s: DEQUEUE for node:%s", v3.GetShortType(req.TypeUrl), con.proxy.ID)
deltaLog.Debugf("%s: DEQUEUE for node:%s", v3.GetShortType(req.TypeUrl), con.proxy.ID)
} else {
// This is an ACK, no delayed push
// Return immediately, no action needed
Expand Down Expand Up @@ -327,7 +330,7 @@ func (s *DiscoveryServer) shouldRespondDelta(con *Connection, request *discovery
// will be different from the version sent. But it is fragile to rely on that.
if request.ErrorDetail != nil {
errCode := codes.Code(request.ErrorDetail.Code)
log.Warnf("ADS:%s: ACK ERROR %s %s:%s", stype, con.ConID, errCode.String(), request.ErrorDetail.GetMessage())
deltaLog.Warnf("ADS:%s: ACK ERROR %s %s:%s", stype, con.ConID, errCode.String(), request.ErrorDetail.GetMessage())
incrementXDSRejects(request.TypeUrl, con.proxy.ID, errCode.String())
if s.StatusGen != nil {
s.StatusGen.OnNack(con.proxy, deltaToSotwRequest(request))
Expand All @@ -352,7 +355,7 @@ func (s *DiscoveryServer) shouldRespondDelta(con *Connection, request *discovery
// We should always respond with the current resource names.
if previousInfo == nil {
// TODO: can we distinguish init and reconnect? Do we care?
log.Debugf("ADS:%s: INIT/RECONNECT %s %s", stype, con.ConID, request.ResponseNonce)
deltaLog.Debugf("ADS:%s: INIT/RECONNECT %s %s", stype, con.ConID, request.ResponseNonce)
con.proxy.Lock()
con.proxy.WatchedResources[request.TypeUrl] = &model.WatchedResource{
TypeUrl: request.TypeUrl,
Expand All @@ -366,7 +369,7 @@ func (s *DiscoveryServer) shouldRespondDelta(con *Connection, request *discovery
// A nonce becomes stale following a newer nonce being sent to Envoy.
// TODO: due to concurrent unsubscribe, this probably doesn't make sense. Do we need any logic here?
if request.ResponseNonce != "" && request.ResponseNonce != previousInfo.NonceSent {
log.Debugf("ADS:%s: REQ %s Expired nonce received %s, sent %s", stype,
deltaLog.Debugf("ADS:%s: REQ %s Expired nonce received %s, sent %s", stype,
con.ConID, request.ResponseNonce, previousInfo.NonceSent)
xdsExpiredNonce.With(typeTag.Value(v3.GetMetricType(request.TypeUrl))).Increment()
con.proxy.Lock()
Expand All @@ -392,18 +395,18 @@ func (s *DiscoveryServer) shouldRespondDelta(con *Connection, request *discovery
newAck := request.ResponseNonce != ""
if newAck != oldAck {
// Not sure which is better, lets just log if they don't match for now and compare.
log.Errorf("ADS:%s: New ACK and old ACK check mismatch: %v vs %v", stype, newAck, oldAck)
deltaLog.Errorf("ADS:%s: New ACK and old ACK check mismatch: %v vs %v", stype, newAck, oldAck)
if features.EnableUnsafeAssertions {
panic(fmt.Sprintf("ADS:%s: New ACK and old ACK check mismatch: %v vs %v", stype, newAck, oldAck))
}
}
// Envoy can send two DiscoveryRequests with same version and nonce
// when it detects a new resource. We should respond if they change.
if oldAck {
log.Debugf("ADS:%s: ACK %s %s", stype, con.ConID, request.ResponseNonce)
deltaLog.Debugf("ADS:%s: ACK %s %s", stype, con.ConID, request.ResponseNonce)
return false
}
log.Debugf("ADS:%s: RESOURCE CHANGE previous resources: %v, new resources: %v %s %s", stype,
deltaLog.Debugf("ADS:%s: RESOURCE CHANGE previous resources: %v, new resources: %v %s %s", stype,
previousResources, deltaResources, con.ConID, request.ResponseNonce)

return true
Expand Down Expand Up @@ -469,7 +472,7 @@ func (s *DiscoveryServer) pushDeltaXds(con *Connection, push *model.PushContext,
resp.RemovedResources = subscribed.SortedList()
}
if len(resp.RemovedResources) > 0 {
log.Debugf("ADS:%v %s REMOVE %v", v3.GetShortType(w.TypeUrl), con.ConID, resp.RemovedResources)
deltaLog.Debugf("ADS:%v %s REMOVE %v", v3.GetShortType(w.TypeUrl), con.ConID, resp.RemovedResources)
}
// normally wildcard xds `subscribe` is always nil, just in case there are some extended type not handled correctly.
if subscribe == nil && isWildcardTypeURL(w.TypeUrl) {
Expand All @@ -493,25 +496,25 @@ func (s *DiscoveryServer) pushDeltaXds(con *Connection, push *model.PushContext,

if err := con.sendDelta(resp); err != nil {
if recordSendError(w.TypeUrl, err) {
log.Warnf("%s: Send failure for node:%s resources:%d size:%s%s: %v",
deltaLog.Warnf("%s: Send failure for node:%s resources:%d size:%s%s: %v",
v3.GetShortType(w.TypeUrl), con.proxy.ID, len(res), util.ByteCount(configSize), info, err)
}
return err
}

switch {
case logdata.Incremental:
if log.DebugEnabled() {
log.Debugf("%s: %s%s for node:%s resources:%d size:%s%s",
if deltaLog.DebugEnabled() {
deltaLog.Debugf("%s: %s%s for node:%s resources:%d size:%s%s",
v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res), util.ByteCount(configSize), info)
}
default:
debug := ""
if log.DebugEnabled() {
if deltaLog.DebugEnabled() {
// Add additional information to logs when debug mode enabled.
debug = " nonce:" + resp.Nonce + " version:" + resp.SystemVersionInfo
}
log.Infof("%s: %s%s for node:%s resources:%d size:%v%s%s", v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res),
deltaLog.Infof("%s: %s%s for node:%s resources:%d size:%v%s%s", v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res),
util.ByteCount(ResourceSize(res)), info, debug)
}

Expand Down

0 comments on commit 34f63bc

Please sign in to comment.