Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Possible panic due to concurrent access to unlocked map #2136

Merged
merged 2 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,13 @@ type perm struct {
}

type permissions struct {
// Have these 2 first for memory alignment due to the use of atomic.
pcsz int32
prun int32
sub perm
pub perm
resp *ResponsePermission
pcache map[string]bool
pcache sync.Map
}

// This is used to dynamically track responses and reply subjects
Expand Down Expand Up @@ -838,7 +841,6 @@ func (c *client) setPermissions(perms *Permissions) {
return
}
c.perms = &permissions{}
c.perms.pcache = make(map[string]bool)

// Loop over publish permissions
if perms.Publish != nil {
Expand Down Expand Up @@ -914,7 +916,6 @@ func (c *client) mergePubDenyPermissions(denyPubs []string) {
}
if c.perms == nil {
c.perms = &permissions{}
c.perms.pcache = make(map[string]bool)
}
if c.perms.pub.deny == nil {
c.perms.pub.deny = NewSublistWithCache()
Expand Down Expand Up @@ -2981,7 +2982,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g

// Check if we are a leafnode and have perms to check.
if client.kind == LEAF && client.perms != nil {
if !client.pubAllowed(string(subject)) {
if !client.pubAllowedFullCheck(string(subject), true, true) {
client.mu.Unlock()
client.Debugf("Not permitted to publish to %q", subject)
return false
Expand Down Expand Up @@ -3269,32 +3270,44 @@ func (c *client) pruneDenyCache() {
// prunePubPermsCache will prune the cache via randomly
// deleting items. Doing so pruneSize items at a time.
func (c *client) prunePubPermsCache() {
// There is a case where we can invoke this from multiple go routines,
// (in deliverMsg() if sub.client is a LEAF), so we make sure to prune
// from only one go routine at a time.
if !atomic.CompareAndSwapInt32(&c.perms.prun, 0, 1) {
return
}
const maxPruneAtOnce = 1000
r := 0
for subject := range c.perms.pcache {
delete(c.perms.pcache, subject)
if r++; r > pruneSize {
break
c.perms.pcache.Range(func(k, _ interface{}) bool {
c.perms.pcache.Delete(k)
if r++; (r > pruneSize && atomic.LoadInt32(&c.perms.pcsz) < int32(maxPermCacheSize)) ||
(r > maxPruneAtOnce) {
return false
}
}
return true
})
atomic.AddInt32(&c.perms.pcsz, -int32(r))
atomic.StoreInt32(&c.perms.prun, 0)
}

// pubAllowed checks on publish permissioning.
// Lock should not be held.
func (c *client) pubAllowed(subject string) bool {
return c.pubAllowedFullCheck(subject, true)
return c.pubAllowedFullCheck(subject, true, false)
}

// pubAllowedFullCheck checks on all publish permissioning depending
// on the flag for dynamic reply permissions.
func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool {
func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bool {
if c.perms == nil || (c.perms.pub.allow == nil && c.perms.pub.deny == nil) {
return true
}
// Check if published subject is allowed if we have permissions in place.
allowed, ok := c.perms.pcache[subject]
v, ok := c.perms.pcache.Load(subject)
if ok {
return allowed
return v.(bool)
}
var allowed bool
// Cache miss, check allow then deny as needed.
if c.perms.pub.allow != nil {
r := c.perms.pub.allow.Match(subject)
Expand All @@ -3313,7 +3326,9 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool {
// dynamically, check to see if we are allowed here but avoid pcache.
// We need to acquire the lock though.
if !allowed && fullCheck && c.perms.resp != nil {
c.mu.Lock()
if !hasLock {
c.mu.Lock()
}
if resp := c.replies[subject]; resp != nil {
resp.n++
// Check if we have sent too many responses.
Expand All @@ -3325,12 +3340,13 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool {
allowed = true
}
}
c.mu.Unlock()
if !hasLock {
c.mu.Unlock()
}
} else {
// Update our cache here.
c.perms.pcache[string(subject)] = allowed
// Prune if needed.
if len(c.perms.pcache) > maxPermCacheSize {
c.perms.pcache.Store(string(subject), allowed)
if n := atomic.AddInt32(&c.perms.pcsz, 1); n > maxPermCacheSize {
c.prunePubPermsCache()
}
}
Expand Down
87 changes: 87 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,93 @@ func TestLeafNodePermissions(t *testing.T) {
}
}

func TestLeafNodePermissionsConcurrentAccess(t *testing.T) {
lo1 := DefaultOptions()
lo1.LeafNode.Host = "127.0.0.1"
lo1.LeafNode.Port = -1
ln1 := RunServer(lo1)
defer ln1.Shutdown()

nc1 := natsConnect(t, ln1.ClientURL())
defer nc1.Close()

natsSub(t, nc1, "_INBOX.>", func(_ *nats.Msg) {})
natsFlush(t, nc1)

ch := make(chan struct{}, 1)
wg := sync.WaitGroup{}
wg.Add(2)

publish := func(nc *nats.Conn) {
defer wg.Done()

for {
select {
case <-ch:
return
default:
nc.Publish(nats.NewInbox(), []byte("hello"))
}
}
}

go publish(nc1)

u, _ := url.Parse(fmt.Sprintf("nats://%s:%d", lo1.LeafNode.Host, lo1.LeafNode.Port))
lo2 := DefaultOptions()
lo2.LeafNode.ReconnectInterval = 5 * time.Millisecond
lo2.LeafNode.connDelay = 500 * time.Millisecond
lo2.LeafNode.Remotes = []*RemoteLeafOpts{
{
URLs: []*url.URL{u},
DenyExports: []string{"foo"},
DenyImports: []string{"bar"},
},
}
ln2 := RunServer(lo2)
defer ln2.Shutdown()

nc2 := natsConnect(t, ln2.ClientURL())
defer nc2.Close()

natsSub(t, nc2, "_INBOX.>", func(_ *nats.Msg) {})
natsFlush(t, nc2)

go publish(nc2)

checkLeafNodeConnected(t, ln1)
checkLeafNodeConnected(t, ln2)

time.Sleep(50 * time.Millisecond)
close(ch)
wg.Wait()
}

func TestLeafNodePubAllowedPruning(t *testing.T) {
c := &client{}
c.setPermissions(&Permissions{Publish: &SubjectPermission{Allow: []string{"foo"}}})

gr := 100
wg := sync.WaitGroup{}
wg.Add(gr)
for i := 0; i < gr; i++ {
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
c.pubAllowed(nats.NewInbox())
}
}()
}

wg.Wait()
if n := int(atomic.LoadInt32(&c.perms.pcsz)); n > maxPermCacheSize {
t.Fatalf("Expected size to be less than %v, got %v", maxPermCacheSize, n)
}
if n := atomic.LoadInt32(&c.perms.prun); n != 0 {
t.Fatalf("c.perms.prun should be 0, was %v", n)
}
}

func TestLeafNodeExportPermissionsNotForSpecialSubs(t *testing.T) {
lo1 := DefaultOptions()
lo1.Accounts = []*Account{NewAccount("SYS")}
Expand Down
2 changes: 1 addition & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
func (c *client) canImport(subject string) bool {
// Use pubAllowed() since this checks Publish permissions which
// is what Import maps to.
return c.pubAllowedFullCheck(subject, false)
return c.pubAllowedFullCheck(subject, false, true)
}

// canExport is whether or not we will accept a SUB from the remote for a given subject.
Expand Down