Skip to content

Commit e86b4b1

Browse files
(2.12) [IMPROVED] Error on missing Nats-Expected-Last-Subject-Sequence (#7196)
The `Nats-Expected-Last-Subject-Sequence-Subject` header was added in #5281, but we would not return an error if it was specified without the `Nats-Expected-Last-Subject-Sequence` header. Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
2 parents 3b44c88 + 4e6bf1e commit e86b4b1

File tree

5 files changed

+54
-4
lines changed

5 files changed

+54
-4
lines changed

server/errors.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1908,5 +1908,15 @@
19081908
"help": "",
19091909
"url": "",
19101910
"deprecates": ""
1911+
},
1912+
{
1913+
"constant": "JSStreamExpectedLastSeqPerSubjectInvalid",
1914+
"code": 400,
1915+
"error_code": 10193,
1916+
"description": "missing sequence for expected last sequence per subject",
1917+
"comment": "",
1918+
"help": "",
1919+
"url": "",
1920+
"deprecates": ""
19111921
}
19121922
]

server/jetstream_batching.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,9 @@ func checkMsgHeadersPreClusteredProposal(
465465
diff.expectedPerSubject[seqSubj] = e
466466
}
467467
}
468+
} else if getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ {
469+
apiErr := NewJSStreamExpectedLastSeqPerSubjectInvalidError()
470+
return hdr, msg, 0, apiErr, apiErr
468471
}
469472

470473
// Message scheduling.

server/jetstream_errors_generated.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,9 @@ const (
434434
// JSStreamDuplicateMessageConflict duplicate message id is in process
435435
JSStreamDuplicateMessageConflict ErrorIdentifier = 10158
436436

437+
// JSStreamExpectedLastSeqPerSubjectInvalid missing sequence for expected last sequence per subject
438+
JSStreamExpectedLastSeqPerSubjectInvalid ErrorIdentifier = 10193
439+
437440
// JSStreamExpectedLastSeqPerSubjectNotReady expected last sequence per subject temporarily unavailable
438441
JSStreamExpectedLastSeqPerSubjectNotReady ErrorIdentifier = 10163
439442

@@ -724,6 +727,7 @@ var (
724727
JSStreamCreateErrF: {Code: 500, ErrCode: 10049, Description: "{err}"},
725728
JSStreamDeleteErrF: {Code: 500, ErrCode: 10050, Description: "{err}"},
726729
JSStreamDuplicateMessageConflict: {Code: 409, ErrCode: 10158, Description: "duplicate message id is in process"},
730+
JSStreamExpectedLastSeqPerSubjectInvalid: {Code: 400, ErrCode: 10193, Description: "missing sequence for expected last sequence per subject"},
727731
JSStreamExpectedLastSeqPerSubjectNotReady: {Code: 503, ErrCode: 10163, Description: "expected last sequence per subject temporarily unavailable"},
728732
JSStreamExternalApiOverlapErrF: {Code: 400, ErrCode: 10021, Description: "stream external api prefix {prefix} must not overlap with {subject}"},
729733
JSStreamExternalDelPrefixOverlapsErrF: {Code: 400, ErrCode: 10022, Description: "stream external delivery prefix {prefix} overlaps with stream subject {subject}"},
@@ -2383,6 +2387,16 @@ func NewJSStreamDuplicateMessageConflictError(opts ...ErrorOption) *ApiError {
23832387
return ApiErrors[JSStreamDuplicateMessageConflict]
23842388
}
23852389

2390+
// NewJSStreamExpectedLastSeqPerSubjectInvalidError creates a new JSStreamExpectedLastSeqPerSubjectInvalid error: "missing sequence for expected last sequence per subject"
2391+
func NewJSStreamExpectedLastSeqPerSubjectInvalidError(opts ...ErrorOption) *ApiError {
2392+
eopts := parseOpts(opts)
2393+
if ae, ok := eopts.err.(*ApiError); ok {
2394+
return ae
2395+
}
2396+
2397+
return ApiErrors[JSStreamExpectedLastSeqPerSubjectInvalid]
2398+
}
2399+
23862400
// NewJSStreamExpectedLastSeqPerSubjectNotReadyError creates a new JSStreamExpectedLastSeqPerSubjectNotReady error: "expected last sequence per subject temporarily unavailable"
23872401
func NewJSStreamExpectedLastSeqPerSubjectNotReadyError(opts ...ErrorOption) *ApiError {
23882402
eopts := parseOpts(opts)

server/jetstream_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9173,8 +9173,8 @@ func TestJetStreamLastSequenceBySubject(t *testing.T) {
91739173
}
91749174

91759175
func TestJetStreamLastSequenceBySubjectWithSubject(t *testing.T) {
9176-
for _, st := range []StorageType{FileStorage, MemoryStorage} {
9177-
t.Run(st.String(), func(t *testing.T) {
9176+
test := func(replicas int, st StorageType) {
9177+
t.Run(fmt.Sprintf("R%d/%s", replicas, st), func(t *testing.T) {
91789178
c := createJetStreamClusterExplicit(t, "JSC", 3)
91799179
defer c.shutdown()
91809180

@@ -9185,7 +9185,7 @@ func TestJetStreamLastSequenceBySubjectWithSubject(t *testing.T) {
91859185
Name: "KV",
91869186
Subjects: []string{"kv.>"},
91879187
Storage: st,
9188-
Replicas: 3,
9188+
Replicas: replicas,
91899189
MaxMsgsPer: 1,
91909190
}
91919191

@@ -9262,8 +9262,21 @@ func TestJetStreamLastSequenceBySubjectWithSubject(t *testing.T) {
92629262
pubAndCheck("kv.xxx", "kv.*.*", "0", false) // Last is 11 for kv.xxx; 11 for kv.*.*;
92639263
pubAndCheck("kv.3.xxx", "kv.3.*", "4", true) // Last is 12 for kv.3.xxx; 12 for kv.3.*;
92649264
pubAndCheck("kv.3.xyz", "kv.3.*", "12", true) // Last is 13 for kv.3.xyz; 13 for kv.3.*;
9265+
9266+
// When using the last-subj-seq-subj header, but the sequence header is missing.
9267+
m = nats.NewMsg("kv.invalid")
9268+
m.Data = []byte("HELLO")
9269+
m.Header.Set(JSExpectedLastSubjSeqSubj, "kv.invalid")
9270+
_, err = js.PublishMsg(m)
9271+
require_Error(t, err, NewJSStreamExpectedLastSeqPerSubjectInvalidError())
92659272
})
92669273
}
9274+
9275+
for _, replicas := range []int{1, 3} {
9276+
for _, st := range []StorageType{FileStorage, MemoryStorage} {
9277+
test(replicas, st)
9278+
}
9279+
}
92679280
}
92689281

92699282
func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) {

server/stream.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4587,7 +4587,7 @@ func getExpectedLastSeqPerSubject(hdr []byte) (uint64, bool) {
45874587

45884588
// Fast lookup of expected subject for the expected stream sequence per subject.
45894589
func getExpectedLastSeqPerSubjectForSubject(hdr []byte) string {
4590-
return string(getHeader(JSExpectedLastSubjSeqSubj, hdr))
4590+
return bytesToString(sliceHeader(JSExpectedLastSubjSeqSubj, hdr))
45914591
}
45924592

45934593
// Fast lookup of the message TTL from headers:
@@ -5446,6 +5446,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
54465446
}
54475447
return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq)
54485448
}
5449+
} else if getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ {
5450+
mset.mu.Unlock()
5451+
apiErr := NewJSStreamExpectedLastSeqPerSubjectInvalidError()
5452+
if canRespond {
5453+
resp.PubAck = &PubAck{Stream: name}
5454+
resp.Error = apiErr
5455+
b, _ := json.Marshal(resp)
5456+
outq.sendMsg(reply, b)
5457+
}
5458+
return apiErr
54495459
}
54505460

54515461
// Expected last sequence.

0 commit comments

Comments
 (0)