Skip to content

Commit

Permalink
Cherry-picks for 2.10.26-RC.3 (#6504)
Browse files Browse the repository at this point in the history
Includes the following:

- #6492
- #6494
- #6498
- #6500
- #6499
- #6502

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored Feb 12, 2025
2 parents 68d3668 + 0f86aed commit 7321c8a
Show file tree
Hide file tree
Showing 16 changed files with 1,675 additions and 59 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.7.3
github.com/nats-io/nats.go v1.39.0
github.com/nats-io/nkeys v0.4.9
github.com/nats-io/nkeys v0.4.10
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.33.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats.go v1.39.0 h1:2/yg2JQjiYYKLwDuBzV0FbB2sIV+eFNkEevlRi4n9lI=
github.com/nats-io/nats.go v1.39.0/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE=
github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
9 changes: 7 additions & 2 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,9 +858,14 @@ func (a *Account) Interest(subject string) int {
func (a *Account) addClient(c *client) int {
a.mu.Lock()
n := len(a.clients)
if a.clients != nil {
a.clients[c] = struct{}{}

// Could come here earlier than the account is registered with the server.
// Make sure we can still track clients.
if a.clients == nil {
a.clients = make(map[*client]struct{})
}
a.clients[c] = struct{}{}

// If we did not add it, we are done
if n == len(a.clients) {
a.mu.Unlock()
Expand Down
230 changes: 226 additions & 4 deletions server/auth_callout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,16 +490,26 @@ func createAuthServiceUser(t *testing.T, accKp nkeys.KeyPair) (pub, creds string
}

func createBasicAccountUser(t *testing.T, accKp nkeys.KeyPair) (creds string) {
return createBasicAccount(t, "auth-client", accKp, true)
}

func createBasicAccountLeaf(t *testing.T, accKp nkeys.KeyPair) (creds string) {
return createBasicAccount(t, "auth-leaf", accKp, false)
}

func createBasicAccount(t *testing.T, name string, accKp nkeys.KeyPair, addDeny bool) (creds string) {
t.Helper()
ukp, _ := nkeys.CreateUser()
seed, _ := ukp.Seed()
upub, _ := ukp.PublicKey()
uclaim := newJWTTestUserClaims()
uclaim.Subject = upub
uclaim.Name = "auth-client"
// For these deny all permission
uclaim.Permissions.Pub.Deny.Add(">")
uclaim.Permissions.Sub.Deny.Add(">")
uclaim.Name = name
if addDeny {
// For these deny all permission
uclaim.Permissions.Pub.Deny.Add(">")
uclaim.Permissions.Sub.Deny.Add(">")
}
vr := jwt.ValidationResults{}
uclaim.Validate(&vr)
require_Len(t, len(vr.Errors()), 0)
Expand Down Expand Up @@ -2019,3 +2029,215 @@ func updateAccount(t *testing.T, sys *nats.Conn, jwtToken string) {
require_NotNil(t, response.Data)
require_Equal(t, response.Data.Code, int(200))
}

func TestAuthCalloutLeafNodeAndOperatorMode(t *testing.T) {
_, spub := createKey(t)
sysClaim := jwt.NewAccountClaims(spub)
sysClaim.Name = "$SYS"
sysJwt, err := sysClaim.Encode(oKp)
require_NoError(t, err)

// A account.
akp, apk := createKey(t)
aClaim := jwt.NewAccountClaims(apk)
aClaim.Name = "A"
aJwt, err := aClaim.Encode(oKp)
require_NoError(t, err)

// AUTH callout service account.
ckp, err := nkeys.FromSeed([]byte(authCalloutIssuerSeed))
require_NoError(t, err)

cpk, err := ckp.PublicKey()
require_NoError(t, err)

// The authorized user for the service.
upub, creds := createAuthServiceUser(t, ckp)
defer removeFile(t, creds)

authClaim := jwt.NewAccountClaims(cpk)
authClaim.Name = "AUTH"
authClaim.EnableExternalAuthorization(upub)
authClaim.Authorization.AllowedAccounts.Add("*")
authJwt, err := authClaim.Encode(oKp)
require_NoError(t, err)

conf := fmt.Sprintf(`
listen: 127.0.0.1:-1
operator: %s
system_account: %s
resolver: MEM
resolver_preload: {
%s: %s
%s: %s
%s: %s
}
leafnodes {
listen: "127.0.0.1:-1"
}
`, ojwt, spub, cpk, authJwt, apk, aJwt, spub, sysJwt)

handler := func(m *nats.Msg) {
user, si, _, opts, _ := decodeAuthRequest(t, m.Data)
if (opts.Username == "leaf" && opts.Password == "pwd") || (opts.Token == "token") {
ujwt := createAuthUser(t, user, "user_a", apk, "", akp, 0, nil)
m.Respond(serviceResponse(t, user, si.ID, ujwt, "", 0))
} else {
m.Respond(nil)
}
}

at := NewAuthTest(t, conf, handler, nats.UserCredentials(creds))
defer at.Cleanup()

ucreds := createBasicAccountUser(t, ckp)
defer removeFile(t, ucreds)

// This should switch us to the A account.
nc := at.Connect(nats.UserCredentials(ucreds), nats.Token("token"))
defer nc.Close()

natsSub(t, nc, "foo", func(m *nats.Msg) {
m.Respond([]byte("here"))
})
natsFlush(t, nc)

// Create creds for the leaf account.
lcreds := createBasicAccountLeaf(t, ckp)
defer removeFile(t, lcreds)

hopts := at.srv.getOpts()

for _, test := range []struct {
name string
up string
ok bool
}{
{"bad token", "tokenx", false},
{"bad username and password", "leaf:pwdx", false},
{"token", "token", true},
{"username and password", "leaf:pwd", true},
} {
t.Run(test.name, func(t *testing.T) {
lconf := createConfFile(t, []byte(fmt.Sprintf(`
listen: "127.0.0.1:-1"
server_name: "LEAF"
leafnodes {
remotes [
{
url: "nats://%s@127.0.0.1:%d"
credentials: "%s"
}
]
}
`, test.up, hopts.LeafNode.Port, lcreds)))
leaf, _ := RunServerWithConfig(lconf)
defer leaf.Shutdown()

if !test.ok {
// Expect failure to connect. Wait a bit before checking.
time.Sleep(50 * time.Millisecond)
checkLeafNodeConnectedCount(t, leaf, 0)
return
}

checkLeafNodeConnected(t, leaf)

checkSubInterest(t, leaf, globalAccountName, "foo", time.Second)

ncl := natsConnect(t, leaf.ClientURL())
defer ncl.Close()

resp, err := ncl.Request("foo", []byte("hello"), time.Second)
require_NoError(t, err)
require_Equal(t, string(resp.Data), "here")
})
}
}

func TestAuthCalloutLeafNodeAndConfigMode(t *testing.T) {
conf := `
listen: "127.0.0.1:-1"
accounts {
AUTH { users [ {user: "auth", password: "pwd"} ] }
A {}
}
authorization {
timeout: 1s
auth_callout {
# Needs to be a public account nkey, will work for both server config and operator mode.
issuer: "ABJHLOVMPA4CI6R5KLNGOB4GSLNIY7IOUPAJC4YFNDLQVIOBYQGUWVLA"
account: AUTH
auth_users: [ auth ]
}
}
leafnodes {
listen: "127.0.0.1:-1"
}
`
handler := func(m *nats.Msg) {
user, si, _, opts, _ := decodeAuthRequest(t, m.Data)
if (opts.Username == "leaf" && opts.Password == "pwd") || (opts.Token == "token") {
ujwt := createAuthUser(t, user, _EMPTY_, "A", "", nil, 0, nil)
m.Respond(serviceResponse(t, user, si.ID, ujwt, "", 0))
} else {
m.Respond(nil)
}
}

at := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer at.Cleanup()

// This should switch us to the A account.
nc := at.Connect(nats.Token("token"))
defer nc.Close()

natsSub(t, nc, "foo", func(m *nats.Msg) {
m.Respond([]byte("here"))
})
natsFlush(t, nc)

hopts := at.srv.getOpts()

for _, test := range []struct {
name string
up string
ok bool
}{
{"bad token", "tokenx", false},
{"bad username and password", "leaf:pwdx", false},
{"token", "token", true},
{"username and password", "leaf:pwd", true},
} {
t.Run(test.name, func(t *testing.T) {
lconf := createConfFile(t, []byte(fmt.Sprintf(`
listen: "127.0.0.1:-1"
server_name: "LEAF"
leafnodes {
remotes [{url: "nats://%s@127.0.0.1:%d"}]
}
`, test.up, hopts.LeafNode.Port)))
leaf, _ := RunServerWithConfig(lconf)
defer leaf.Shutdown()

if !test.ok {
// Expect failure to connect. Wait a bit before checking.
time.Sleep(50 * time.Millisecond)
checkLeafNodeConnectedCount(t, leaf, 0)
return
}

checkLeafNodeConnected(t, leaf)

checkSubInterest(t, leaf, globalAccountName, "foo", time.Second)

ncl := natsConnect(t, leaf.ClientURL())
defer ncl.Close()

resp, err := ncl.Request("foo", []byte("hello"), time.Second)
require_NoError(t, err)
require_Equal(t, string(resp.Data), "here")
})
}

}
4 changes: 4 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3489,6 +3489,10 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su
// sending to is in a stalled state, go ahead and wait here
// with a limit.
if c.kind == CLIENT && client.out.stc != nil {
if srv.getOpts().NoFastProducerStall {
client.mu.Unlock()
return false
}
client.stalledWait(c)
}

Expand Down
18 changes: 7 additions & 11 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ type consumer struct {
ackMsgs *ipQueue[*jsAckMsg]

// for stream signaling when multiple filters are set.
sigSubs []*subscription
sigSubs []string
}

// A single subject filter.
Expand Down Expand Up @@ -5433,23 +5433,23 @@ func (o *consumer) account() *Account {

// Creates a sublist for consumer.
// All subjects share the same callback.
func (o *consumer) signalSubs() []*subscription {
func (o *consumer) signalSubs() []string {
o.mu.Lock()
defer o.mu.Unlock()

if o.sigSubs != nil {
return o.sigSubs
}

subs := []*subscription{}
if o.subjf == nil {
subs = append(subs, &subscription{subject: []byte(fwcs), icb: o.processStreamSignal})
if len(o.subjf) == 0 {
subs := []string{fwcs}
o.sigSubs = subs
return subs
}

subs := make([]string, 0, len(o.subjf))
for _, filter := range o.subjf {
subs = append(subs, &subscription{subject: []byte(filter.subject), icb: o.processStreamSignal})
subs = append(subs, filter.subject)
}
o.sigSubs = subs
return subs
Expand All @@ -5459,7 +5459,7 @@ func (o *consumer) signalSubs() []*subscription {
// We know that this subject matches us by how the parent handles registering us with the signaling sublist,
// but we must check if we are leader.
// We do need the sequence of the message however and we use the msg as the encoded seq.
func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, subject, _ string, seqb []byte) {
func (o *consumer) processStreamSignal(seq uint64) {
// We can get called here now when not leader, so bail fast
// and without acquiring any locks.
if !o.leader.Load() {
Expand All @@ -5470,10 +5470,6 @@ func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, s
if o.mset == nil {
return
}

var le = binary.LittleEndian
seq := le.Uint64(seqb)

if seq > o.npf {
o.npc++
}
Expand Down
Loading

0 comments on commit 7321c8a

Please sign in to comment.