Skip to content

Commit

Permalink
[FIXED] Stream ingest after JWT exports/imports update (#6498)
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison authored Feb 11, 2025
2 parents c5157ce + e966df8 commit ffa2227
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 2 deletions.
9 changes: 7 additions & 2 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,9 +907,14 @@ func (a *Account) Interest(subject string) int {
func (a *Account) addClient(c *client) int {
a.mu.Lock()
n := len(a.clients)
if a.clients != nil {
a.clients[c] = struct{}{}

// Could come here earlier than the account is registered with the server.
// Make sure we can still track clients.
if a.clients == nil {
a.clients = make(map[*client]struct{})
}
a.clients[c] = struct{}{}

// If we did not add it, we are done
if n == len(a.clients) {
a.mu.Unlock()
Expand Down
126 changes: 126 additions & 0 deletions server/jetstream_jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1644,3 +1644,129 @@ func TestJetStreamJWTClusterAccountNRG(t *testing.T) {
}
}
}

func TestJetStreamJWTUpdateWithPreExistingStream(t *testing.T) {
updateJwt := func(url string, creds string, pubKey string, jwt string) {
t.Helper()
c := natsConnect(t, url, nats.UserCredentials(creds))
defer c.Close()
if msg, err := c.Request(fmt.Sprintf(accUpdateEventSubjNew, pubKey), []byte(jwt), time.Second); err != nil {
t.Fatal("error not expected in this test", err)
} else {
content := make(map[string]any)
if err := json.Unmarshal(msg.Data, &content); err != nil {
t.Fatalf("%v", err)
} else if _, ok := content["data"]; !ok {
t.Fatalf("did not get an ok response got: %v", content)
}
}
}
createUserCreds := func(akp nkeys.KeyPair) string {
uKp1, _ := nkeys.CreateUser()
uSeed1, _ := uKp1.Seed()
uclaim := newJWTTestUserClaims()
uclaim.Subject, _ = uKp1.PublicKey()
userJwt1, err := uclaim.Encode(akp)
require_NoError(t, err)
return genCredsFile(t, userJwt1, uSeed1)
}
// Create system account.
sysKp, _ := nkeys.CreateAccount()
sysPub, _ := sysKp.PublicKey()
sysUKp, _ := nkeys.CreateUser()
sysUSeed, _ := sysUKp.Seed()
uclaim := newJWTTestUserClaims()
uclaim.Subject, _ = sysUKp.PublicKey()
sysUserJwt, err := uclaim.Encode(sysKp)
require_NoError(t, err)
sysKp.Seed()
sysCreds := genCredsFile(t, sysUserJwt, sysUSeed)
// Create exporting account.
akpE, _ := nkeys.CreateAccount()
aPubE, _ := akpE.PublicKey()
claimE := jwt.NewAccountClaims(aPubE)
aJwtE, err := claimE.Encode(oKp)
require_NoError(t, err)
// Create importing account.
akpI, _ := nkeys.CreateAccount()
aPubI, _ := akpI.PublicKey()
claimI := jwt.NewAccountClaims(aPubI)
claimI.Limits.JetStreamLimits = jwt.JetStreamLimits{MemoryStorage: 1024 * 1024, DiskStorage: 1024 * 1024}
claimI.Imports.Add(&jwt.Import{
Name: "import",
Subject: "foo",
Account: aPubE,
Type: jwt.Stream,
})
aJwtI, err := claimI.Encode(oKp)
require_NoError(t, err)
// Create users.
userCredsE := createUserCreds(akpE)
userCredsI := createUserCreds(akpI)
// Start server and update JWTs.
dir := t.TempDir()
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
jetstream: {max_mem_store: 10Mb, max_file_store: 10Mb, store_dir: "%s"}
operator: %s
resolver: {
type: full
dir: '%s'
}
system_account: %s
`, dir, ojwt, dir, sysPub)))
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
updateJwt(s.ClientURL(), sysCreds, aPubI, aJwtI)
updateJwt(s.ClientURL(), sysCreds, aPubE, aJwtE)

// Create stream on importing account before we restart.
nci, js := jsClientConnect(t, s, nats.UserCredentials(userCredsI))
defer nci.Close()
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

// Restart server.
nci.Close()
s.Shutdown()
s.WaitForShutdown()
s, _ = RunServerWithConfig(conf)
defer s.Shutdown()

// Reconnect and confirm stream is empty.
nci, js = jsClientConnect(t, s, nats.UserCredentials(userCredsI))
defer nci.Close()
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 0)

// If an import/export gets added when the stream already existed on startup.
// We should still be able to route those messages.
claimE.Exports.Add(&jwt.Export{
Name: "export",
Subject: "foo",
Type: jwt.Stream,
})
aJwtE, err = claimE.Encode(oKp)
require_NoError(t, err)
updateJwt(s.ClientURL(), sysCreds, aPubE, aJwtE)

// Connect to exporting account and publish a message that should be exported/imported.
nce := natsConnect(t, s.ClientURL(), nats.UserCredentials(userCredsE))
defer nce.Close()
err = nce.Publish("foo", nil)
require_NoError(t, err)

// Confirm the message was captured by the stream on the importing account.
checkFor(t, 2*time.Second, 500*time.Millisecond, func() error {
if si, err = js.StreamInfo("TEST"); err != nil {
return err
} else if si.State.Msgs != 1 {
return fmt.Errorf("expected 1 message in stream, got %d", si.State.Msgs)
}
return nil
})
}

0 comments on commit ffa2227

Please sign in to comment.