Skip to content

Commit 3d0a13a

Browse files
authored
[IMPROVED] Return more specific cons info error on ordered consumer recreation (#1931)
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
1 parent 5e20e85 commit 3d0a13a

File tree

3 files changed

+22
-3
lines changed

3 files changed

+22
-3
lines changed

js.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2836,6 +2836,10 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
28362836
sub.mu.Lock()
28372837
// TODO(dlc) - Better way to mark especially if we attach.
28382838
if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ {
2839+
if sub.jsi.ordered {
2840+
sub.mu.Unlock()
2841+
return nil, ErrConsumerInfoOnOrderedReset
2842+
}
28392843
sub.mu.Unlock()
28402844
return nil, ErrTypeSubscription
28412845
}

jserrors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ var (
150150
// ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed
151151
ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "Leadership Changed"}
152152

153+
// ErrConsumerInfoOnOrderedReset is returned when attempting to fetch consumer info for an ordered consumer that is currently being recreated.
154+
ErrConsumerInfoOnOrderedReset JetStreamError = &jsError{message: "cannot fetch consumer info; ordered consumer is being reset"}
155+
153156
// ErrNoHeartbeat is returned when no heartbeat is received from server when sending requests with pull consumer.
154157
ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"}
155158

test/js_test.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10105,9 +10105,21 @@ func TestJetStreamOrderedConsumerRecreateAfterReconnect(t *testing.T) {
1010510105
if err != nil {
1010610106
t.Fatalf("Unexpected error: %v", err)
1010710107
}
10108-
consInfo, err = sub.ConsumerInfo()
10109-
if err != nil {
10110-
t.Fatalf("Unexpected error: %v", err)
10108+
var infoErr error
10109+
for range 5 {
10110+
consInfo, infoErr = sub.ConsumerInfo()
10111+
if infoErr != nil {
10112+
if errors.Is(infoErr, nats.ErrConsumerInfoOnOrderedReset) {
10113+
time.Sleep(100 * time.Millisecond)
10114+
continue
10115+
}
10116+
t.Fatalf("Unexpected error: %v", err)
10117+
}
10118+
infoErr = nil
10119+
break
10120+
}
10121+
if infoErr != nil {
10122+
t.Fatalf("Unexpected error: %v", infoErr)
1011110123
}
1011210124
if consInfo.Name == consName || len(consInfo.Name) != 8 {
1011310125
t.Fatalf("Unexpected consumer name: %q", consInfo.Name)

0 commit comments

Comments
 (0)