Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fixed] leaf node subscription permission negotiation. #2091

Merged
merged 3 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,18 @@ const (

// Some client state represented as flags
const (
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
infoReceived // The INFO protocol has been received
firstPongSent // The first PONG has been sent
handshakeComplete // For TLS clients, indicate that the handshake is complete
flushOutbound // Marks client as having a flushOutbound call in progress.
noReconnect // Indicate that on close, this connection should not attempt a reconnect
closeConnection // Marks that closeConnection has already been called.
connMarkedClosed // Marks that markConnAsClosed has already been called.
writeLoopStarted // Marks that the writeLoop has been started.
skipFlushOnClose // Marks that flushOutbound() should not be called on connection close.
expectConnect // Marks if this connection is expected to send a CONNECT
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
infoReceived // The INFO protocol has been received
firstPongSent // The first PONG has been sent
handshakeComplete // For TLS clients, indicate that the handshake is complete
flushOutbound // Marks client as having a flushOutbound call in progress.
noReconnect // Indicate that on close, this connection should not attempt a reconnect
closeConnection // Marks that closeConnection has already been called.
connMarkedClosed // Marks that markConnAsClosed has already been called.
writeLoopStarted // Marks that the writeLoop has been started.
skipFlushOnClose // Marks that flushOutbound() should not be called on connection close.
expectConnect // Marks if this connection is expected to send a CONNECT
connectProcessFinished // Marks if this connection has finished the connect process.
)

// set the flag (would be equivalent to set the boolean to true)
Expand Down Expand Up @@ -2953,6 +2954,7 @@ func (c *client) deliverMsg(sub *subscription, subject, reply, mh, msg []byte, g
if client.kind == LEAF && client.perms != nil {
if !client.pubAllowed(string(subject)) {
client.mu.Unlock()
client.Debugf("Not permitted to publish to %q", subject)
return false
}
}
Expand Down
105 changes: 79 additions & 26 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,17 +472,18 @@ func (s *Server) startLeafNodeAcceptLoop() {
tlsRequired := opts.LeafNode.TLSConfig != nil
tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
info := Info{
ID: s.info.ID,
Name: s.info.Name,
Version: s.info.Version,
GitCommit: gitCommit,
GoVersion: runtime.Version(),
AuthRequired: true,
TLSRequired: tlsRequired,
TLSVerify: tlsVerify,
MaxPayload: s.info.MaxPayload, // TODO(dlc) - Allow override?
Headers: s.supportsHeaders(),
Proto: 1, // Fixed for now.
ID: s.info.ID,
Name: s.info.Name,
Version: s.info.Version,
GitCommit: gitCommit,
GoVersion: runtime.Version(),
AuthRequired: true,
TLSRequired: tlsRequired,
TLSVerify: tlsVerify,
MaxPayload: s.info.MaxPayload, // TODO(dlc) - Allow override?
Headers: s.supportsHeaders(),
Proto: 1, // Fixed for now.
InfoOnConnect: true,
}
// If we have selected a random port...
if port == 0 {
Expand Down Expand Up @@ -902,20 +903,27 @@ func (c *client) processLeafnodeInfo(info *Info) {
c.setPermissions(perms)
}

var finishConnect bool
var resumeConnect bool
var s *Server

// If this is a remote connection and this is the first INFO protocol,
// then we need to finish the connect process by sending CONNECT, etc..
if firstINFO && c.leaf.remote != nil {
// Clear deadline that was set in createLeafNode while waiting for the INFO.
c.nc.SetDeadline(time.Time{})
finishConnect = true
s = c.srv
resumeConnect = true
}
s = c.srv
c.mu.Unlock()

if finishConnect && s != nil {
finishConnect := info.ConnectInfo
if resumeConnect && s != nil {
s.leafNodeResumeConnectProcess(c)
if !info.InfoOnConnect {
finishConnect = true
}
}
if finishConnect {
s.leafNodeFinishConnectProcess(c)
}
}
Expand Down Expand Up @@ -1177,15 +1185,13 @@ func (c *client) remoteCluster() string {
// Sends back an info block to the soliciting leafnode to let it know about
// its permission settings for local enforcement.
func (s *Server) sendPermsInfo(c *client) {
if c.perms == nil {
return
}
// Copy
info := s.copyLeafNodeInfo()
c.mu.Lock()
info.CID = c.cid
info.Import = c.opts.Import
info.Export = c.opts.Export
info.ConnectInfo = true
b, _ := json.Marshal(info)
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
c.enqueueProto(bytes.Join(pcs, []byte(" ")))
Expand All @@ -1207,6 +1213,7 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
ims := []string{}
acc.mu.Lock()
accName := acc.Name
accNTag := acc.nameTag
// If we are solicited we only send interest for local clients.
if c.isSpokeLeafNode() {
acc.sl.localSubs(&subs)
Expand All @@ -1220,6 +1227,10 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
// Since leaf nodes only send on interest, if the bound
// account has import services we need to send those over.
for isubj := range acc.imports.services {
if !c.canSubscribe(isubj) {
c.Debugf("Not permitted to import service %s on behalf of %s/%s", isubj, accName, accNTag)
continue
}
ims = append(ims, isubj)
}
// Create a unique subject that will be used for loop detection.
Expand Down Expand Up @@ -1260,6 +1271,10 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
rc := c.leaf.remoteCluster
c.leaf.smap = make(map[string]int32)
for _, sub := range subs {
if !c.canSubscribe(string(sub.subject)) {
c.Debugf("Not permitted to subscribe to %s on behalf of %s/%s", string(sub.subject), accName, accNTag)
continue
}
// We ignore ourselves here.
// Also don't add the subscription if it has a origin cluster and the
// cluster name matches the one of the client we are sending to.
Expand Down Expand Up @@ -1348,6 +1363,8 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
// Check to make sure this sub does not have an origin cluster than matches the leafnode.
ln.mu.Lock()
skip := sub.origin != nil && string(sub.origin) == ln.remoteCluster()
// do not skip on !ln.canSubscribe(string(sub.subject))
// Given allow:foo, > would be rejected. For leaf nodes filtering is done on the (soliciting) end.
ln.mu.Unlock()
if skip {
continue
Expand Down Expand Up @@ -2107,10 +2124,11 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
return preBuf, 0, nil
}

const connectProcessTimeout = 2 * time.Second

// This is invoked for remote LEAF remote connections after processing the INFO
// protocol. This will do the TLS handshake (if needed be), send the CONNECT protocol
// and register the leaf node.
func (s *Server) leafNodeFinishConnectProcess(c *client) {
// protocol. This will do the TLS handshake (if needed be)
func (s *Server) leafNodeResumeConnectProcess(c *client) {
clusterName := s.ClusterName()

c.mu.Lock()
Expand All @@ -2120,11 +2138,6 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) {
}
remote := c.leaf.remote

// Check if we will need to send the system connect event.
remote.RLock()
sendSysConnectEvent := remote.Hub
remote.RUnlock()

var tlsRequired bool

// In case of websocket, the TLS handshake has been already done.
Expand Down Expand Up @@ -2164,10 +2177,50 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) {
// Spin up the write loop.
s.startGoRoutine(func() { c.writeLoop() })

// timeout leafNodeFinishConnectProcess
c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
c.mu.Lock()
// check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
if !c.flags.setIfNotSet(connectProcessFinished) {
c.mu.Unlock()
return
}
clearTimer(&c.ping.tmr)
closed := c.isClosed()
c.mu.Unlock()
if !closed {
c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
c.closeConnection(StaleConnection)
}
})
c.mu.Unlock()
c.Debugf("Remote leafnode connect msg sent")
}

// This is invoked for remote LEAF remote connections after processing the INFO
// protocol and leafNodeResumeConnectProcess.
// This will send LS+ the CONNECT protocol and register the leaf node.
func (s *Server) leafNodeFinishConnectProcess(c *client) {
c.mu.Lock()
if !c.flags.setIfNotSet(connectProcessFinished) {
c.mu.Unlock()
return
}
if c.isClosed() {
c.mu.Unlock()
s.removeLeafNodeConnection(c)
return
}
remote := c.leaf.remote
// Check if we will need to send the system connect event.
remote.RLock()
sendSysConnectEvent := remote.Hub
remote.RUnlock()

// Capture account before releasing lock
acc := c.acc
// cancel connectProcessTimeout
clearTimer(&c.ping.tmr)
c.mu.Unlock()

// Make sure we register with the account here.
Expand Down
8 changes: 5 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ type Info struct {
LameDuckMode bool `json:"ldm,omitempty"`

// Route Specific
Import *SubjectPermission `json:"import,omitempty"`
Export *SubjectPermission `json:"export,omitempty"`
LNOC bool `json:"lnoc,omitempty"`
Import *SubjectPermission `json:"import,omitempty"`
Export *SubjectPermission `json:"export,omitempty"`
LNOC bool `json:"lnoc,omitempty"`
InfoOnConnect bool `json:"info_on_connect,omitempty"` // When true the server will respond to CONNECT with an INFO
ConnectInfo bool `json:"connect_info,omitempty"` // When true this is the server INFO response to CONNECT

// Gateways Specific
Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO)
Expand Down
12 changes: 10 additions & 2 deletions test/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,22 @@ func setupGatewayConn(t testing.TB, c net.Conn, org, dst string) (sendFun, expec
return sendCommand(t, c), expectCommand(t, c)
}

func expectNumberOfProtos(t *testing.T, expFn expectFun, proto *regexp.Regexp, expected int) {
func expectNumberOfProtos(t *testing.T, expFn expectFun, proto *regexp.Regexp, expected int, ignore ...*regexp.Regexp) {
t.Helper()
buf := []byte(nil)
for count := 0; count != expected; {
buf := expFn(proto)
buf = append(buf, expFn(anyRe)...)
for _, skip := range ignore {
buf = skip.ReplaceAll(buf, []byte(``))
}
count += len(proto.FindAllSubmatch(buf, -1))
if count > expected {
t.Fatalf("Expected %v matches, got %v", expected, count)
}
buf = proto.ReplaceAll(buf, []byte(``))
}
if len(buf) != 0 {
t.Fatalf("did not consume everything, left with: %q", buf)
}
}

Expand Down
28 changes: 21 additions & 7 deletions test/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func setupLeaf(t *testing.T, lc net.Conn, expectedSubs int) (sendFun, expectFun)
send, expect := setupConn(t, lc)
// A loop detection subscription is sent, so consume this here, along
// with the ones that caller expect on setup.
expectNumberOfProtos(t, expect, lsubRe, expectedSubs)
expectNumberOfProtos(t, expect, lsubRe, expectedSubs, infoRe, pingRe)
return send, expect
}

Expand Down Expand Up @@ -871,17 +871,28 @@ func TestLeafNodeGatewayInterestPropagation(t *testing.T) {
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
_, leafExpect := setupConn(t, lc)
var totalBuf []byte
buf := leafExpect(infoRe)
buf = infoRe.ReplaceAll(buf, []byte(nil))
foundFoo := false
for count := 0; count != 5; {
buf := leafExpect(lsubRe)
totalBuf = append(totalBuf, buf...)
// skip first time if we still have data (buf from above may already have some left)
if count != 0 || len(buf) == 0 {
buf = append(buf, leafExpect(anyRe)...)
}
count += len(lsubRe.FindAllSubmatch(buf, -1))
if count > 5 {
t.Fatalf("Expected %v matches, got %v (buf=%s)", 4, count, totalBuf)
t.Fatalf("Expected %v matches, got %v (buf=%s)", 4, count, buf)
}
if strings.Contains(string(buf), "foo") {
foundFoo = true
}
buf = lsubRe.ReplaceAll(buf, []byte(nil))
}
if len(buf) != 0 {
t.Fatalf("did not consume everything, left with: %q", buf)
}
if !strings.Contains(string(totalBuf), "foo") {
t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", totalBuf)
if !foundFoo {
t.Fatalf("Expected interest for 'foo' as 'LS+ foo\\r\\n', got %q", buf)
}
}

Expand Down Expand Up @@ -1156,6 +1167,7 @@ func TestLeafNodeBasicAuth(t *testing.T) {
lc = createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
leafSend, leafExpect := setupConnWithUserPass(t, lc, "derek", "s3cr3t!")
leafExpect(infoRe)
leafExpect(lsubRe)
leafSend("PING\r\n")
leafExpect(pongRe)
Expand Down Expand Up @@ -1422,6 +1434,8 @@ func TestLeafNodeUserPermsForConnection(t *testing.T) {
nuc.Permissions.Pub.Allow.Add("foo.>")
nuc.Permissions.Pub.Allow.Add("baz.>")
nuc.Permissions.Sub.Allow.Add("foo.>")
// we would be immediately disconnected if that would not work
nuc.Permissions.Sub.Deny.Add("$SYS.>")
ujwt, err := nuc.Encode(akp)
if err != nil {
t.Fatalf("Error generating user JWT: %v", err)
Expand Down
Loading