Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Leafnode: incorrect loop detection in multi-cluster setup #2066

Merged
merged 1 commit into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,10 +1256,13 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {

// Now walk the results and add them to our smap
c.mu.Lock()
rc := c.leaf.remoteCluster
c.leaf.smap = make(map[string]int32)
for _, sub := range subs {
// We ignore ourselves here.
if c != sub.client {
// Also don't add the subscription if it has a origin cluster and the
// cluster name matches the one of the client we are sending to.
if c != sub.client && (sub.origin == nil || (string(sub.origin) != rc)) {
c.leaf.smap[keyFromSub(sub)]++
if c.leaf.tsub == nil {
c.leaf.tsub = make(map[*subscription]struct{})
Expand Down
219 changes: 219 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3066,3 +3066,222 @@ func TestLeafNodeStreamImport(t *testing.T) {

natsNexMsg(t, sub, time.Second)
}

func TestLeafNodeRouteSubWithOrigin(t *testing.T) {
lo1 := DefaultOptions()
lo1.LeafNode.Host = "127.0.0.1"
lo1.LeafNode.Port = -1
lo1.Cluster.Name = "local"
lo1.Cluster.Host = "127.0.0.1"
lo1.Cluster.Port = -1
l1 := RunServer(lo1)
defer l1.Shutdown()

lo2 := DefaultOptions()
lo2.LeafNode.Host = "127.0.0.1"
lo2.LeafNode.Port = -1
lo2.Cluster.Name = "local"
lo2.Cluster.Host = "127.0.0.1"
lo2.Cluster.Port = -1
lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port))
l2 := RunServer(lo2)
defer l2.Shutdown()

checkClusterFormed(t, l1, l2)

u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port))
urls := []*url.URL{u1}

ro1 := DefaultOptions()
ro1.Cluster.Name = "remote"
ro1.Cluster.Host = "127.0.0.1"
ro1.Cluster.Port = -1
ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond
ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
r1 := RunServer(ro1)
defer r1.Shutdown()

checkLeafNodeConnected(t, r1)

nc := natsConnect(t, r1.ClientURL(), nats.NoReconnect())
defer nc.Close()
natsSubSync(t, nc, "foo")
natsQueueSubSync(t, nc, "bar", "baz")
checkSubInterest(t, l2, globalAccountName, "foo", time.Second)
checkSubInterest(t, l2, globalAccountName, "bar", time.Second)

// Now shutdown the leafnode and check that any subscription for $G on l2 are gone.
r1.Shutdown()
checkFor(t, time.Second, 15*time.Millisecond, func() error {
acc := l2.GlobalAccount()
if n := acc.TotalSubs(); n != 0 {
return fmt.Errorf("Account %q should have 0 sub, got %v", acc.GetName(), n)
}
return nil
})
}

func TestLeafNodeLoopDetectionWithMultipleClusters(t *testing.T) {
lo1 := DefaultOptions()
lo1.LeafNode.Host = "127.0.0.1"
lo1.LeafNode.Port = -1
lo1.Cluster.Name = "local"
lo1.Cluster.Host = "127.0.0.1"
lo1.Cluster.Port = -1
l1 := RunServer(lo1)
defer l1.Shutdown()

lo2 := DefaultOptions()
lo2.LeafNode.Host = "127.0.0.1"
lo2.LeafNode.Port = -1
lo2.Cluster.Name = "local"
lo2.Cluster.Host = "127.0.0.1"
lo2.Cluster.Port = -1
lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port))
l2 := RunServer(lo2)
defer l2.Shutdown()

checkClusterFormed(t, l1, l2)

u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port))
u2, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo2.LeafNode.Port))
urls := []*url.URL{u1, u2}

ro1 := DefaultOptions()
ro1.Cluster.Name = "remote"
ro1.Cluster.Host = "127.0.0.1"
ro1.Cluster.Port = -1
ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond
ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
r1 := RunServer(ro1)
defer r1.Shutdown()

l := &captureErrorLogger{errCh: make(chan string, 100)}
r1.SetLogger(l, false, false)

ro2 := DefaultOptions()
ro2.Cluster.Name = "remote"
ro2.Cluster.Host = "127.0.0.1"
ro2.Cluster.Port = -1
ro2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ro1.Cluster.Port))
ro2.LeafNode.ReconnectInterval = 50 * time.Millisecond
ro2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
r2 := RunServer(ro2)
defer r2.Shutdown()

checkClusterFormed(t, r1, r2)
checkLeafNodeConnected(t, r1)
checkLeafNodeConnected(t, r2)

l1.Shutdown()

// Now wait for r1 and r2 to reconnect, they should not have a problem of loop detection.
checkLeafNodeConnected(t, r1)
checkLeafNodeConnected(t, r2)

// Wait and make sure we don't have a loop error
timeout := time.NewTimer(500 * time.Millisecond)
for {
select {
case err := <-l.errCh:
if strings.Contains(err, "Loop detected") {
t.Fatal(err)
}
case <-timeout.C:
// OK, we are done.
return
}
}
}

func TestLeafNodeUnsubOnRouteDisconnect(t *testing.T) {
lo1 := DefaultOptions()
lo1.LeafNode.Host = "127.0.0.1"
lo1.LeafNode.Port = -1
lo1.Cluster.Name = "local"
lo1.Cluster.Host = "127.0.0.1"
lo1.Cluster.Port = -1
l1 := RunServer(lo1)
defer l1.Shutdown()

lo2 := DefaultOptions()
lo2.LeafNode.Host = "127.0.0.1"
lo2.LeafNode.Port = -1
lo2.Cluster.Name = "local"
lo2.Cluster.Host = "127.0.0.1"
lo2.Cluster.Port = -1
lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port))
l2 := RunServer(lo2)
defer l2.Shutdown()

checkClusterFormed(t, l1, l2)

u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port))
u2, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo2.LeafNode.Port))
urls := []*url.URL{u1, u2}

ro1 := DefaultOptions()
// DefaultOptions sets a cluster name, so make sure they are different.
// Also, we don't have r1 and r2 clustered in this test, so set port to 0.
ro1.Cluster.Name = _EMPTY_
ro1.Cluster.Port = 0
ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond
ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}}
r1 := RunServer(ro1)
defer r1.Shutdown()

ro2 := DefaultOptions()
ro1.Cluster.Name = _EMPTY_
ro2.Cluster.Port = 0
ro2.LeafNode.ReconnectInterval = 50 * time.Millisecond
// Have this one point only to l2
ro2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u2}}}
r2 := RunServer(ro2)
defer r2.Shutdown()

checkLeafNodeConnected(t, r1)
checkLeafNodeConnected(t, r2)

// Create a subscription on r1.
nc := natsConnect(t, r1.ClientURL())
defer nc.Close()
sub := natsSubSync(t, nc, "foo")
natsFlush(t, nc)

checkSubInterest(t, l2, globalAccountName, "foo", time.Second)
checkSubInterest(t, r2, globalAccountName, "foo", time.Second)

nc2 := natsConnect(t, r2.ClientURL())
defer nc2.Close()
natsPub(t, nc, "foo", []byte("msg1"))

// Check message received
natsNexMsg(t, sub, time.Second)

// Now shutdown l1, l2 should update subscription interest to r2.
// When r1 reconnects to l2, subscription should be updated too.
l1.Shutdown()

// Wait a bit (so that the check of interest is not OK just because
// the route would not have been yet detected as broken), and check
// interest still present on r2, l2.
time.Sleep(100 * time.Millisecond)
checkSubInterest(t, l2, globalAccountName, "foo", time.Second)
checkSubInterest(t, r2, globalAccountName, "foo", time.Second)

// Check again that message received ok
natsPub(t, nc, "foo", []byte("msg2"))
natsNexMsg(t, sub, time.Second)

// Now close client. Interest should disappear on r2. Due to a bug,
// it was not.
nc.Close()

checkFor(t, time.Second, 15*time.Millisecond, func() error {
acc := r2.GlobalAccount()
if n := acc.Interest("foo"); n != 0 {
return fmt.Errorf("Still interest on subject: %v", n)
}
return nil
})
}
8 changes: 5 additions & 3 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,8 @@ func (c *client) removeRemoteSubs() {
ase := as[accountName]
if ase == nil {
if v, ok := srv.accounts.Load(accountName); ok {
as[accountName] = &asubs{acc: v.(*Account), subs: []*subscription{sub}}
ase = &asubs{acc: v.(*Account), subs: []*subscription{sub}}
as[accountName] = ase
} else {
continue
}
Expand All @@ -901,6 +902,7 @@ func (c *client) removeRemoteSubs() {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(accountName, sub, -1)
}
srv.updateLeafNodes(ase.acc, sub, -1)
}

// Now remove the subs by batch for each account sublist.
Expand Down Expand Up @@ -1077,9 +1079,9 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
// We store local subs by account and subject and optionally queue name.
// If we have a queue it will have a trailing weight which we do not want.
if sub.queue != nil {
sub.sid = arg[:len(arg)-len(args[3+off])-1]
sub.sid = arg[len(sub.origin)+off : len(arg)-len(args[3+off])-1]
} else {
sub.sid = arg
sub.sid = arg[len(sub.origin)+off:]
}
key := string(sub.sid)

Expand Down