Skip to content

Commit

Permalink
Add subject filtering when listing streams (#1062)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio authored Aug 31, 2022
1 parent 5af7980 commit c3a557a
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 0 deletions.
13 changes: 13 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ type jsOpts struct {
purgeOpts *StreamPurgeRequest
// streamInfoOpts contains optional stream info options
streamInfoOpts *StreamInfoRequest
// streamListSubject is used for subject filtering when listing streams / stream names
streamListSubject string
// For direct get message requests
directGet bool
// For direct get next message
Expand Down Expand Up @@ -359,6 +361,17 @@ func DirectGetNext(subject string) JSOpt {
})
}

// StreamListFilter is an option that can be used to configure `StreamsInfo()` and `StreamNames()` requests.
// It allows filtering the retured streams by subject associated with each stream.
// Wildcards can be used. For example, `StreamListFilter(FOO.*.A) will return
// all streams which have at least one subject matching the provided pattern (e.g. FOO.TEST.A).
func StreamListFilter(subject string) JSOpt {
return jsOptFn(func(opts *jsOpts) error {
opts.streamListSubject = subject
return nil
})
}

func (js *js) apiSubj(subj string) string {
if js.opts.pre == _EMPTY_ {
return subj
Expand Down
2 changes: 2 additions & 0 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ func (s *streamLister) Next() bool {

req, err := json.Marshal(streamNamesRequest{
apiPagedRequest: apiPagedRequest{Offset: s.offset},
Subject: s.js.opts.streamListSubject,
})
if err != nil {
s.err = err
Expand Down Expand Up @@ -1311,6 +1312,7 @@ func (l *streamNamesLister) Next() bool {

req, err := json.Marshal(streamNamesRequest{
apiPagedRequest: apiPagedRequest{Offset: l.offset},
Subject: l.js.opts.streamListSubject,
})
if err != nil {
l.err = err
Expand Down
69 changes: 69 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,75 @@ func TestStreamLister(t *testing.T) {
}
}

func TestStreamLister_FilterSubject(t *testing.T) {
streams := map[string][]string{
"s1": {"foo"},
"s2": {"bar"},
"s3": {"foo.*", "bar.*"},
"s4": {"foo-1.A"},
"s5": {"foo.A.bar.B"},
"s6": {"foo.C.bar.D.E"},
}
tests := []struct {
filter string
expected []string
}{
{
filter: "foo",
expected: []string{"s1"},
},
{
filter: "bar",
expected: []string{"s2"},
},
{
filter: "*",
expected: []string{"s1", "s2"},
},
{
filter: ">",
expected: []string{"s1", "s2", "s3", "s4", "s5", "s6"},
},
{
filter: "*.A",
expected: []string{"s3", "s4"},
},
}
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
for name, subjects := range streams {
if _, err := js.AddStream(&nats.StreamConfig{Name: name, Subjects: subjects}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

for _, test := range tests {
t.Run(test.filter, func(t *testing.T) {
names := make([]string, 0)

// list stream names
for name := range js.StreamNames(nats.StreamListFilter(test.filter)) {
names = append(names, name)
}
if !reflect.DeepEqual(names, test.expected) {
t.Fatalf("Invalid result; want: %v; got: %v", test.expected, names)
}

// list streams
names = make([]string, 0)
for info := range js.StreamsInfo(nats.StreamListFilter(test.filter)) {
names = append(names, info.Config.Name)
}
if !reflect.DeepEqual(names, test.expected) {
t.Fatalf("Invalid result; want: %v; got: %v", test.expected, names)
}
})
}
}

func TestConsumersLister(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit c3a557a

Please sign in to comment.