Skip to content

Commit

Permalink
fix map locking issue in ads properly (istio#37320)
Browse files Browse the repository at this point in the history
* fix locking issue in ads properly

Signed-off-by: Rama Chavali <rama.rao@salesforce.com>

* fix

Signed-off-by: Rama Chavali <rama.rao@salesforce.com>

* fix it

Signed-off-by: Rama Chavali <rama.rao@salesforce.com>

* more refactor

Signed-off-by: Rama Chavali <rama.rao@salesforce.com>

* remove watchedresourcelist from proxy

Signed-off-by: Rama Chavali <rama.rao@salesforce.com>
  • Loading branch information
ramaraochavali committed Feb 16, 2022
1 parent f437a0c commit c90a0c4
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 33 deletions.
63 changes: 37 additions & 26 deletions pilot/pkg/xds/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pilot/pkg/networking/util"
labelutil "istio.io/istio/pilot/pkg/serviceregistry/util/label"
"istio.io/istio/pilot/pkg/util/sets"
v3 "istio.io/istio/pilot/pkg/xds/v3"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config/schema/gvk"
Expand Down Expand Up @@ -696,13 +697,10 @@ func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
return nil
}

con.proxy.RLock()
wr := con.proxy.WatchedResources
con.proxy.RUnlock()

// Send pushes to all generators
// Each Generator is responsible for determining if the push event requires a push
for _, w := range orderWatchedResources(wr) {
wrl, ignoreEvents := con.pushDetails()
for _, w := range wrl {
if !features.EnableFlowControl {
// Always send the push if flow control disabled
if err := s.pushXds(con, pushRequest.Push, w, pushRequest); err != nil {
Expand Down Expand Up @@ -738,7 +736,7 @@ func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
}
if pushRequest.Full {
// Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack.
reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.LedgerVersion, wr)
reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.LedgerVersion, ignoreEvents)
}

proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())
Expand All @@ -758,32 +756,14 @@ var KnownOrderedTypeUrls = map[string]struct{}{
v3.SecretType: {},
}

// orderWatchedResources orders the resources in accordance with known push order.
func orderWatchedResources(resources map[string]*model.WatchedResource) []*model.WatchedResource {
wr := make([]*model.WatchedResource, 0, len(resources))
// first add all known types, in order
for _, tp := range PushOrder {
if w, f := resources[tp]; f {
wr = append(wr, w)
}
}
// Then add any undeclared types
for tp, w := range resources {
if _, f := KnownOrderedTypeUrls[tp]; !f {
wr = append(wr, w)
}
}
return wr
}

func reportAllEvents(s DistributionStatusCache, id, version string, ignored map[string]*model.WatchedResource) {
func reportAllEvents(s DistributionStatusCache, id, version string, ignored sets.Set) {
if s == nil {
return
}
// this version of the config will never be distributed to this envoy because it is not a relevant diff.
// inform distribution status reporter that this connection has been updated, because it effectively has
for distributionType := range AllEventTypes {
if _, f := ignored[distributionType]; f {
if ignored.Contains(distributionType) {
// Skip this type
continue
}
Expand Down Expand Up @@ -992,6 +972,37 @@ func (conn *Connection) Watched(typeUrl string) *model.WatchedResource {
return nil
}

// pushDetails returns the details needed for current push. It returns ordered list of
// watched resources for the proxy, ordered in accordance with known push order.
// It also returns the lis of typeUrls.
// nolint
func (conn *Connection) pushDetails() ([]*model.WatchedResource, sets.Set) {
conn.proxy.RLock()
defer conn.proxy.RUnlock()
typeUrls := sets.NewSet()
for k := range conn.proxy.WatchedResources {
typeUrls.Insert(k)
}
return orderWatchedResources(conn.proxy.WatchedResources), typeUrls
}

func orderWatchedResources(resources map[string]*model.WatchedResource) []*model.WatchedResource {
wr := make([]*model.WatchedResource, 0, len(resources))
// first add all known types, in order
for _, tp := range PushOrder {
if w, f := resources[tp]; f {
wr = append(wr, w)
}
}
// Then add any undeclared types
for tp, w := range resources {
if _, f := KnownOrderedTypeUrls[tp]; !f {
wr = append(wr, w)
}
}
return wr
}

func (conn *Connection) Stop() {
close(conn.stop)
}
8 changes: 3 additions & 5 deletions pilot/pkg/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,8 @@ func (s *DiscoveryServer) pushConnectionDelta(con *Connection, pushEv *Event) er

// Send pushes to all generators
// Each Generator is responsible for determining if the push event requires a push
con.proxy.RLock()
wr := con.proxy.WatchedResources
con.proxy.RUnlock()
for _, w := range orderWatchedResources(wr) {
wrl, ignoreEvents := con.pushDetails()
for _, w := range wrl {
if !features.EnableFlowControl {
// Always send the push if flow control disabled
if err := s.pushDeltaXds(con, pushRequest.Push, w, nil, pushRequest); err != nil {
Expand Down Expand Up @@ -184,7 +182,7 @@ func (s *DiscoveryServer) pushConnectionDelta(con *Connection, pushEv *Event) er
}
if pushRequest.Full {
// Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack.
reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.LedgerVersion, wr)
reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.LedgerVersion, ignoreEvents)
}

proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())
Expand Down
4 changes: 2 additions & 2 deletions security/pkg/nodeagent/sds/sdsservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ func NewXdsServer(stop chan struct{}, gen model.XdsResourceGenerator) *xds.Disco
return true
}
proxy.RLock()
wr := proxy.WatchedResources[v3.SecretType]
wr := proxy.WatchedResources[v3.SecretType].ResourceNames
proxy.RUnlock()

if wr == nil {
return false
}

names := sets.NewSet(wr.ResourceNames...)
names := sets.NewSet(wr...)
found := false
for name := range model.ConfigsOfKind(req.ConfigsUpdated, gvk.Secret) {
if names.Contains(name.Name) {
Expand Down

0 comments on commit c90a0c4

Please sign in to comment.