Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve JS when a leafnode cluster extends and shares a system account. #2108

Merged
merged 1 commit into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3779,9 +3779,10 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
ci = &cis
ci.Service = c.acc.Name
}
} else {
} else if c.kind != LEAF || c.pa.hdr < 0 || len(getHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 {
ci = c.getClientInfo(share)
}

if ci != nil {
if b, _ := json.Marshal(ci); b != nil {
msg = c.setHeader(ClientInfoHdr, string(b), msg)
Expand Down
13 changes: 13 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,19 @@ func (js *jetStream) setupMetaGroup() error {

cfg := &RaftConfig{Name: defaultMetaGroupName, Store: storeDir, Log: fs}

// If we are soliciting leafnode connections and we are sharing a system account
// we want to move to observer mode so that we extend the solicited cluster or supercluster
// but do not form our own.
if ln := s.getOpts().LeafNode; len(ln.Remotes) > 0 {
sys := s.SystemAccount().GetName()
for _, r := range ln.Remotes {
if r.LocalAccount == sys {
cfg.Observer = true
break
}
}
}

var bootstrap bool
if _, err := readPeerState(storeDir); err != nil {
s.Noticef("JetStream cluster bootstrapping")
Expand Down
111 changes: 110 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5519,6 +5519,56 @@ func TestJetStreamClusterMixedMode(t *testing.T) {
})
}

func TestJetStreamClusterSuperClusterAndLeafNodesWithSharedSystemAccount(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()

lnc := sc.createLeafNodes("LNC", 2)
defer lnc.shutdown()

// We want to make sure there is only one leader and its always in the supercluster.
sc.waitOnLeader()

if ml := lnc.leader(); ml != nil {
t.Fatalf("Detected a meta-leader in the leafnode cluster: %s", ml)
}

// leafnodes should have been added into the overall peer count.
sc.waitOnPeerCount(8)

// Make a stream by connecting to the leafnode cluster. Make sure placement is correct.
// Client based API
nc, js := jsClientConnect(t, lnc.randomServer())
defer nc.Close()

si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 2,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster.Name != "LNC" {
t.Fatalf("Expected default placement to be %q, got %q", "LNC", si.Cluster.Name)
}

// Now make sure placement also works if we want to place in a cluster in the supercluster.
pcn := "C2"
si, err = js.AddStream(&nats.StreamConfig{
Name: "TEST2",
Subjects: []string{"baz"},
Replicas: 2,
Placement: &nats.Placement{Cluster: pcn},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster.Name != pcn {
t.Fatalf("Expected default placement to be %q, got %q", pcn, si.Cluster.Name)
}
}

// Support functions

// Used to setup superclusters for tests.
Expand All @@ -5541,6 +5591,10 @@ var jsClusterTempl = `
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}

leaf {
listen: 127.0.0.1:-1
}

cluster {
name: %s
listen: 127.0.0.1:%d
Expand All @@ -5559,6 +5613,8 @@ var jsSuperClusterTempl = `
gateways = [%s
]
}

system_account: "$SYS"
`

var jsClusterLimitsTempl = `
Expand Down Expand Up @@ -5678,6 +5734,53 @@ func createJetStreamSuperCluster(t *testing.T, numServersPer, numClusters int) *
return sc
}

var jsClusterTemplWithLeafNode = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: "%s"}

{{leaf}}

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}

# For access to system account.
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`

var jsLeafFrag = `
leaf {
remotes [
{ urls: [ %s ], deny_exports: ["$JS.API.>"] }
{ urls: [ %s ], account: "$SYS" }
]
}
`

func (sc *supercluster) createLeafNodes(clusterName string, numServers int) *cluster {
// Create our leafnode cluster template first.
c := sc.randomCluster()
var lns, lnss []string
for _, s := range c.servers {
ln := s.getOpts().LeafNode
lns = append(lns, fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port))
lnss = append(lnss, fmt.Sprintf("nats://admin:s3cr3t!@%s:%d", ln.Host, ln.Port))
}
lnc := strings.Join(lns, ", ")
lnsc := strings.Join(lnss, ", ")
lconf := fmt.Sprintf(jsLeafFrag, lnc, lnsc)
tmpl := strings.Replace(jsClusterTemplWithLeafNode, "{{leaf}}", lconf, 1)

lc := createJetStreamCluster(sc.t, tmpl, clusterName, numServers, false)
for _, s := range lc.servers {
checkLeafNodeConnectedCount(sc.t, s, 2)
}
return lc
}

func (sc *supercluster) leader() *Server {
for _, c := range sc.clusters {
if leader := c.leader(); leader != nil {
Expand Down Expand Up @@ -5856,6 +5959,10 @@ func createJetStreamClusterExplicit(t *testing.T, clusterName string, numServers
}

func createJetStreamClusterWithTemplate(t *testing.T, tmpl string, clusterName string, numServers int) *cluster {
return createJetStreamCluster(t, tmpl, clusterName, numServers, true)
}

func createJetStreamCluster(t *testing.T, tmpl string, clusterName string, numServers int, waitOnReady bool) *cluster {
t.Helper()
if clusterName == "" || numServers < 1 {
t.Fatalf("Bad params")
Expand Down Expand Up @@ -5884,7 +5991,9 @@ func createJetStreamClusterWithTemplate(t *testing.T, tmpl string, clusterName s

// Wait til we are formed and have a leader.
c.checkClusterFormed()
c.waitOnClusterReady()
if waitOnReady {
c.waitOnClusterReady()
}

return c
}
Expand Down
20 changes: 16 additions & 4 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type raft struct {
js *jetStream
dflag bool
pleader bool
observer bool

// Subjects for votes, updates, replays.
psubj string
Expand Down Expand Up @@ -225,10 +226,11 @@ const (
)

type RaftConfig struct {
Name string
Store string
Log WAL
Track bool
Name string
Store string
Log WAL
Track bool
Observer bool
}

var (
Expand Down Expand Up @@ -365,6 +367,7 @@ func (s *Server) startRaftNode(cfg *RaftConfig) (RaftNode, error) {
applyc: make(chan *CommittedEntry, 8192),
leadc: make(chan bool, 8),
stepdown: make(chan string, 8),
observer: cfg.Observer,
}
n.c.registerWithAccount(sacc)

Expand Down Expand Up @@ -1419,6 +1422,12 @@ func (n *raft) electTimer() *time.Timer {
return n.elect
}

func (n *raft) isObserver() bool {
n.RLock()
defer n.RUnlock()
return n.observer
}

func (n *raft) runAsFollower() {
for {
elect := n.electTimer()
Expand All @@ -1435,6 +1444,9 @@ func (n *raft) runAsFollower() {
if n.outOfResources() {
n.resetElectionTimeout()
n.debug("Not switching to candidate, no resources")
} else if n.isObserver() {
n.resetElect(48 * time.Hour)
n.debug("Not switching to candidate, observer only")
} else {
n.switchToCandidate()
return
Expand Down