diff --git a/js.go b/js.go index 7785e4981..b703d3dac 100644 --- a/js.go +++ b/js.go @@ -230,6 +230,8 @@ type jsOpts struct { shouldTrace bool // purgeOpts contains optional stream purge options purgeOpts *StreamPurgeRequest + // streamInfoOpts contains optional stream info options + streamInfoOpts *StreamInfoRequest } const ( @@ -301,6 +303,11 @@ func (s *StreamPurgeRequest) configureJSContext(js *jsOpts) error { return nil } +func (s *StreamInfoRequest) configureJSContext(js *jsOpts) error { + js.streamInfoOpts = s + return nil +} + // APIPrefix changes the default prefix used for the JetStream API. func APIPrefix(pre string) JSOpt { return jsOptFn(func(js *jsOpts) error { diff --git a/jsm.go b/jsm.go index 9170f8304..d08d0e3b1 100644 --- a/jsm.go +++ b/jsm.go @@ -634,7 +634,13 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { return resp.StreamInfo, nil } -type streamInfoResponse = streamCreateResponse +type ( + // StreamInfoRequest contains additional option to return details about messages deleted from a stream + StreamInfoRequest struct { + DeletedDetails bool `json:"deleted_details,omitempty"` + } + streamInfoResponse = streamCreateResponse +) func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { if err := checkStreamName(stream); err != nil { @@ -647,9 +653,15 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { if cancel != nil { defer cancel() } + var req []byte + if o.streamInfoOpts != nil { + if req, err = json.Marshal(o.streamInfoOpts); err != nil { + return nil, err + } + } + siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream)) - csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream)) - r, err := js.apiRequestWithContext(o.ctx, csSubj, nil) + r, err := js.apiRequestWithContext(o.ctx, siSubj, req) if err != nil { return nil, err } @@ -686,13 +698,15 @@ type StreamSourceInfo struct { // StreamState is information about the given stream. type StreamState struct { - Msgs uint64 `json:"messages"` - Bytes uint64 `json:"bytes"` - FirstSeq uint64 `json:"first_seq"` - FirstTime time.Time `json:"first_ts"` - LastSeq uint64 `json:"last_seq"` - LastTime time.Time `json:"last_ts"` - Consumers int `json:"consumer_count"` + Msgs uint64 `json:"messages"` + Bytes uint64 `json:"bytes"` + FirstSeq uint64 `json:"first_seq"` + FirstTime time.Time `json:"first_ts"` + LastSeq uint64 `json:"last_seq"` + LastTime time.Time `json:"last_ts"` + Consumers int `json:"consumer_count"` + Deleted []uint64 `json:"deleted"` + NumDeleted int `json:"num_deleted"` } // ClusterInfo shows information about the underlying set of servers diff --git a/test/js_test.go b/test/js_test.go index fcccf5e6c..59730ac62 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1702,6 +1702,110 @@ func TestPurgeStream(t *testing.T) { } } +func TestStreamInfo(t *testing.T) { + testData := []string{"one", "two", "three", "four"} + + tests := []struct { + name string + stream string + req *nats.StreamInfoRequest + withError error + expectedDeletedDetails []uint64 + }{ + { + name: "empty request body", + stream: "foo", + }, + { + name: "with deleted details", + stream: "foo", + req: &nats.StreamInfoRequest{ + DeletedDetails: true, + }, + expectedDeletedDetails: []uint64{2, 4}, + }, + { + name: "with deleted details set to false", + stream: "foo", + req: &nats.StreamInfoRequest{ + DeletedDetails: false, + }, + }, + { + name: "empty stream name", + stream: "", + withError: nats.ErrStreamNameRequired, + }, + { + name: "invalid stream name", + stream: "bad.stream.name", + withError: nats.ErrInvalidStreamName, + }, + { + name: "stream not found", + stream: "bar", + withError: nats.ErrStreamNotFound, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo.A"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for _, msg := range testData { + if _, err := js.Publish("foo.A", []byte(msg)); err != nil { + t.Fatalf("Unexpected error during publish: %v", err) + } + } + if err := js.DeleteMsg("foo", 2); err != nil { + t.Fatalf("Unexpected error while deleting message from stream: %v", err) + } + if err := js.DeleteMsg("foo", 4); err != nil { + t.Fatalf("Unexpected error while deleting message from stream: %v", err) + } + + var streamInfo *nats.StreamInfo + if test.req != nil { + streamInfo, err = js.StreamInfo(test.stream, test.req) + } else { + streamInfo, err = js.StreamInfo(test.stream) + } + if test.withError != nil { + if err == nil { + t.Fatal("Expected error, got nil") + } + if !errors.Is(err, test.withError) { + t.Fatalf("Expected error: '%s'; got '%s'", test.withError, err) + } + return + } + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if streamInfo.Config.Name != "foo" { + t.Fatalf("Invalid stream name in StreamInfo response: want: 'foo'; got: '%s'", streamInfo.Config.Name) + } + if streamInfo.State.NumDeleted != 2 { + t.Fatalf("Invalid value for num_deleted in state: want: 2; got: %d", streamInfo.State.NumDeleted) + } + if !reflect.DeepEqual(test.expectedDeletedDetails, streamInfo.State.Deleted) { + t.Fatalf("Invalid value for deleted msgs in state: want: %v; got: %v", test.expectedDeletedDetails, streamInfo.State.Deleted) + } + }) + } +} + func TestJetStreamManagement_GetMsg(t *testing.T) { t.Run("1-node", func(t *testing.T) { withJSServer(t, testJetStreamManagement_GetMsg) @@ -2361,7 +2465,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { Name: sourceName, Storage: nats.FileStorage, Sources: []*nats.StreamSource{ - &nats.StreamSource{ + { Name: publishSubj, External: &nats.ExternalStream{ APIPrefix: "RI.JS.API",