Skip to content

Commit

Permalink
[FIXED] KV discard policy in jetstream pkg (#1616)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored Apr 17, 2024
1 parent cdc50de commit fa996fc
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
1 change: 1 addition & 0 deletions jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ func (js *jetStream) prepareKeyValueConfig(ctx context.Context, cfg KeyValueConf
AllowDirect: true,
RePublish: cfg.RePublish,
Compression: compression,
Discard: DiscardNew,
}
if cfg.Mirror != nil {
// Copy in case we need to make changes so we do not change caller's version.
Expand Down
37 changes: 36 additions & 1 deletion jetstream/test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,11 +1481,46 @@ func TestKeyValueCreate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST"})
kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "TEST",
Description: "Test KV",
MaxValueSize: 128,
History: 10,
TTL: 1 * time.Hour,
MaxBytes: 1024,
Storage: jetstream.FileStorage,
})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}

expectedStreamConfig := jetstream.StreamConfig{
Name: "KV_TEST",
Description: "Test KV",
Subjects: []string{"$KV.TEST.>"},
MaxMsgs: -1,
MaxBytes: 1024,
Discard: jetstream.DiscardNew,
MaxAge: 1 * time.Hour,
MaxMsgsPerSubject: 10,
MaxMsgSize: 128,
Storage: jetstream.FileStorage,
DenyDelete: true,
AllowRollup: true,
AllowDirect: true,
MaxConsumers: -1,
Replicas: 1,
Duplicates: 2 * time.Minute,
}

stream, err := js.Stream(ctx, "KV_TEST")
if err != nil {
t.Fatalf("Error getting stream: %v", err)
}
if !reflect.DeepEqual(stream.CachedInfo().Config, expectedStreamConfig) {
t.Fatalf("Expected stream config to be %+v, got %+v", expectedStreamConfig, stream.CachedInfo().Config)
}

_, err = kv.Create(ctx, "key", []byte("1"))
if err != nil {
t.Fatalf("Error creating key: %v", err)
Expand Down
37 changes: 36 additions & 1 deletion test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1376,11 +1376,46 @@ func TestKeyValueCreate(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST"})
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "TEST",
Description: "Test KV",
MaxValueSize: 128,
History: 10,
TTL: 1 * time.Hour,
MaxBytes: 1024,
Storage: nats.FileStorage,
})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}

expectedStreamConfig := nats.StreamConfig{
Name: "KV_TEST",
Description: "Test KV",
Subjects: []string{"$KV.TEST.>"},
MaxMsgs: -1,
MaxBytes: 1024,
Discard: nats.DiscardNew,
MaxAge: 1 * time.Hour,
MaxMsgsPerSubject: 10,
MaxMsgSize: 128,
Storage: nats.FileStorage,
DenyDelete: true,
AllowRollup: true,
AllowDirect: true,
MaxConsumers: -1,
Replicas: 1,
Duplicates: 2 * time.Minute,
}

si, err := js.StreamInfo("KV_TEST")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}
if !reflect.DeepEqual(si.Config, expectedStreamConfig) {
t.Fatalf("Expected stream config to be %+v, got %+v", expectedStreamConfig, si.Config)
}

_, err = kv.Create("key", []byte("1"))
if err != nil {
t.Fatalf("Error creating key: %v", err)
Expand Down

0 comments on commit fa996fc

Please sign in to comment.