-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
Copy pathjetstream_api.go
4509 lines (3927 loc) · 136 KB
/
jetstream_api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2020-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"cmp"
"encoding/json"
"errors"
"fmt"
"math/rand"
"os"
"path/filepath"
"runtime"
"slices"
"strconv"
"strings"
"sync/atomic"
"time"
"unicode"
"github.com/nats-io/nuid"
)
// Request API subjects for JetStream.
const (
// All API endpoints.
jsAllAPI = "$JS.API.>"
// For constructing JetStream domain prefixes.
jsDomainAPI = "$JS.%s.API.>"
JSApiPrefix = "$JS.API"
// JSApiAccountInfo is for obtaining general information about JetStream for this account.
// Will return JSON response.
JSApiAccountInfo = "$JS.API.INFO"
// JSApiTemplateCreate is the endpoint to create new stream templates.
// Will return JSON response.
JSApiTemplateCreate = "$JS.API.STREAM.TEMPLATE.CREATE.*"
JSApiTemplateCreateT = "$JS.API.STREAM.TEMPLATE.CREATE.%s"
// JSApiTemplates is the endpoint to list all stream template names for this account.
// Will return JSON response.
JSApiTemplates = "$JS.API.STREAM.TEMPLATE.NAMES"
// JSApiTemplateInfo is for obtaining general information about a named stream template.
// Will return JSON response.
JSApiTemplateInfo = "$JS.API.STREAM.TEMPLATE.INFO.*"
JSApiTemplateInfoT = "$JS.API.STREAM.TEMPLATE.INFO.%s"
// JSApiTemplateDelete is the endpoint to delete stream templates.
// Will return JSON response.
JSApiTemplateDelete = "$JS.API.STREAM.TEMPLATE.DELETE.*"
JSApiTemplateDeleteT = "$JS.API.STREAM.TEMPLATE.DELETE.%s"
// JSApiStreamCreate is the endpoint to create new streams.
// Will return JSON response.
JSApiStreamCreate = "$JS.API.STREAM.CREATE.*"
JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s"
// JSApiStreamUpdate is the endpoint to update existing streams.
// Will return JSON response.
JSApiStreamUpdate = "$JS.API.STREAM.UPDATE.*"
JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s"
// JSApiStreams is the endpoint to list all stream names for this account.
// Will return JSON response.
JSApiStreams = "$JS.API.STREAM.NAMES"
// JSApiStreamList is the endpoint that will return all detailed stream information
JSApiStreamList = "$JS.API.STREAM.LIST"
// JSApiStreamInfo is for obtaining general information about a named stream.
// Will return JSON response.
JSApiStreamInfo = "$JS.API.STREAM.INFO.*"
JSApiStreamInfoT = "$JS.API.STREAM.INFO.%s"
// JSApiStreamDelete is the endpoint to delete streams.
// Will return JSON response.
JSApiStreamDelete = "$JS.API.STREAM.DELETE.*"
JSApiStreamDeleteT = "$JS.API.STREAM.DELETE.%s"
// JSApiStreamPurge is the endpoint to purge streams.
// Will return JSON response.
JSApiStreamPurge = "$JS.API.STREAM.PURGE.*"
JSApiStreamPurgeT = "$JS.API.STREAM.PURGE.%s"
// JSApiStreamSnapshot is the endpoint to snapshot streams.
// Will return a stream of chunks with a nil chunk as EOF to
// the deliver subject. Caller should respond to each chunk
// with a nil body response for ack flow.
JSApiStreamSnapshot = "$JS.API.STREAM.SNAPSHOT.*"
JSApiStreamSnapshotT = "$JS.API.STREAM.SNAPSHOT.%s"
// JSApiStreamRestore is the endpoint to restore a stream from a snapshot.
// Caller should respond to each chunk with a nil body response.
JSApiStreamRestore = "$JS.API.STREAM.RESTORE.*"
JSApiStreamRestoreT = "$JS.API.STREAM.RESTORE.%s"
// JSApiMsgDelete is the endpoint to delete messages from a stream.
// Will return JSON response.
JSApiMsgDelete = "$JS.API.STREAM.MSG.DELETE.*"
JSApiMsgDeleteT = "$JS.API.STREAM.MSG.DELETE.%s"
// JSApiMsgGet is the template for direct requests for a message by its stream sequence number.
// Will return JSON response.
JSApiMsgGet = "$JS.API.STREAM.MSG.GET.*"
JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"
// JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
// Will return the message similar to how a consumer receives the message, no JSON processing.
// If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
JSDirectMsgGet = "$JS.API.DIRECT.GET.*"
JSDirectMsgGetT = "$JS.API.DIRECT.GET.%s"
// This is a direct version of get last by subject, which will be the dominant pattern for KV access once 2.9 is released.
// The stream and the key will be part of the subject to allow for no-marshal payloads and subject based security permissions.
JSDirectGetLastBySubject = "$JS.API.DIRECT.GET.*.>"
JSDirectGetLastBySubjectT = "$JS.API.DIRECT.GET.%s.%s"
// jsDirectGetPre
jsDirectGetPre = "$JS.API.DIRECT.GET"
// JSApiConsumerCreate is the endpoint to create consumers for streams.
// This was also the legacy endpoint for ephemeral consumers.
// It now can take consumer name and optional filter subject, which when part of the subject controls access.
// Will return JSON response.
JSApiConsumerCreate = "$JS.API.CONSUMER.CREATE.*"
JSApiConsumerCreateT = "$JS.API.CONSUMER.CREATE.%s"
JSApiConsumerCreateEx = "$JS.API.CONSUMER.CREATE.*.>"
JSApiConsumerCreateExT = "$JS.API.CONSUMER.CREATE.%s.%s.%s"
// JSApiDurableCreate is the endpoint to create durable consumers for streams.
// You need to include the stream and consumer name in the subject.
JSApiDurableCreate = "$JS.API.CONSUMER.DURABLE.CREATE.*.*"
JSApiDurableCreateT = "$JS.API.CONSUMER.DURABLE.CREATE.%s.%s"
// JSApiConsumers is the endpoint to list all consumer names for the stream.
// Will return JSON response.
JSApiConsumers = "$JS.API.CONSUMER.NAMES.*"
JSApiConsumersT = "$JS.API.CONSUMER.NAMES.%s"
// JSApiConsumerList is the endpoint that will return all detailed consumer information
JSApiConsumerList = "$JS.API.CONSUMER.LIST.*"
JSApiConsumerListT = "$JS.API.CONSUMER.LIST.%s"
// JSApiConsumerInfo is for obtaining general information about a consumer.
// Will return JSON response.
JSApiConsumerInfo = "$JS.API.CONSUMER.INFO.*.*"
JSApiConsumerInfoT = "$JS.API.CONSUMER.INFO.%s.%s"
// JSApiConsumerDelete is the endpoint to delete consumers.
// Will return JSON response.
JSApiConsumerDelete = "$JS.API.CONSUMER.DELETE.*.*"
JSApiConsumerDeleteT = "$JS.API.CONSUMER.DELETE.%s.%s"
// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
// jsRequestNextPre
jsRequestNextPre = "$JS.API.CONSUMER.MSG.NEXT."
// For snapshots and restores. The ack will have additional tokens.
jsSnapshotAckT = "$JS.SNAPSHOT.ACK.%s.%s"
jsRestoreDeliverT = "$JS.SNAPSHOT.RESTORE.%s.%s"
// JSApiStreamRemovePeer is the endpoint to remove a peer from a clustered stream and its consumers.
// Will return JSON response.
JSApiStreamRemovePeer = "$JS.API.STREAM.PEER.REMOVE.*"
JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"
// JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
// Will return JSON response.
JSApiStreamLeaderStepDown = "$JS.API.STREAM.LEADER.STEPDOWN.*"
JSApiStreamLeaderStepDownT = "$JS.API.STREAM.LEADER.STEPDOWN.%s"
// JSApiConsumerLeaderStepDown is the endpoint to have consumer leader stepdown.
// Will return JSON response.
JSApiConsumerLeaderStepDown = "$JS.API.CONSUMER.LEADER.STEPDOWN.*.*"
JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
// JSApiLeaderStepDown is the endpoint to have our metaleader stepdown.
// Only works from system account.
// Will return JSON response.
JSApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN"
// JSApiRemoveServer is the endpoint to remove a peer server from the cluster.
// Only works from system account.
// Will return JSON response.
JSApiRemoveServer = "$JS.API.SERVER.REMOVE"
// JSApiAccountPurge is the endpoint to purge the js content of an account
// Only works from system account.
// Will return JSON response.
JSApiAccountPurge = "$JS.API.ACCOUNT.PURGE.*"
JSApiAccountPurgeT = "$JS.API.ACCOUNT.PURGE.%s"
// JSApiServerStreamMove is the endpoint to move streams off a server
// Only works from system account.
// Will return JSON response.
JSApiServerStreamMove = "$JS.API.ACCOUNT.STREAM.MOVE.*.*"
JSApiServerStreamMoveT = "$JS.API.ACCOUNT.STREAM.MOVE.%s.%s"
// JSApiServerStreamCancelMove is the endpoint to cancel a stream move
// Only works from system account.
// Will return JSON response.
JSApiServerStreamCancelMove = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.*.*"
JSApiServerStreamCancelMoveT = "$JS.API.ACCOUNT.STREAM.CANCEL_MOVE.%s.%s"
// The prefix for system level account API.
jsAPIAccountPre = "$JS.API.ACCOUNT."
// jsAckT is the template for the ack message stream coming back from a consumer
// when they ACK/NAK, etc a message.
jsAckT = "$JS.ACK.%s.%s"
jsAckPre = "$JS.ACK."
jsAckPreLen = len(jsAckPre)
// jsFlowControl is for flow control subjects.
jsFlowControlPre = "$JS.FC."
// jsFlowControl is for FC responses.
jsFlowControl = "$JS.FC.%s.%s.*"
// JSAdvisoryPrefix is a prefix for all JetStream advisories.
JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"
// JSMetricPrefix is a prefix for all JetStream metrics.
JSMetricPrefix = "$JS.EVENT.METRIC"
// JSMetricConsumerAckPre is a metric containing ack latency.
JSMetricConsumerAckPre = "$JS.EVENT.METRIC.CONSUMER.ACK"
// JSAdvisoryConsumerMaxDeliveryExceedPre is a notification published when a message exceeds its delivery threshold.
JSAdvisoryConsumerMaxDeliveryExceedPre = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES"
// JSAdvisoryConsumerMsgNakPre is a notification published when a message has been naked
JSAdvisoryConsumerMsgNakPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED"
// JSAdvisoryConsumerMsgTerminatedPre is a notification published when a message has been terminated.
JSAdvisoryConsumerMsgTerminatedPre = "$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED"
// JSAdvisoryStreamCreatedPre notification that a stream was created.
JSAdvisoryStreamCreatedPre = "$JS.EVENT.ADVISORY.STREAM.CREATED"
// JSAdvisoryStreamDeletedPre notification that a stream was deleted.
JSAdvisoryStreamDeletedPre = "$JS.EVENT.ADVISORY.STREAM.DELETED"
// JSAdvisoryStreamUpdatedPre notification that a stream was updated.
JSAdvisoryStreamUpdatedPre = "$JS.EVENT.ADVISORY.STREAM.UPDATED"
// JSAdvisoryConsumerCreatedPre notification that a template created.
JSAdvisoryConsumerCreatedPre = "$JS.EVENT.ADVISORY.CONSUMER.CREATED"
// JSAdvisoryConsumerDeletedPre notification that a template deleted.
JSAdvisoryConsumerDeletedPre = "$JS.EVENT.ADVISORY.CONSUMER.DELETED"
// JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"
// JSAdvisoryStreamSnapshotCompletePre notification that a snapshot was completed.
JSAdvisoryStreamSnapshotCompletePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE"
// JSAdvisoryStreamRestoreCreatePre notification that a restore was start.
JSAdvisoryStreamRestoreCreatePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE"
// JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
// JSAdvisoryDomainLeaderElectedPre notification that a jetstream domain has elected a leader.
JSAdvisoryDomainLeaderElected = "$JS.EVENT.ADVISORY.DOMAIN.LEADER_ELECTED"
// JSAdvisoryStreamLeaderElectedPre notification that a replicated stream has elected a leader.
JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"
// JSAdvisoryStreamQuorumLostPre notification that a stream and its consumers are stalled.
JSAdvisoryStreamQuorumLostPre = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST"
// JSAdvisoryConsumerLeaderElectedPre notification that a replicated consumer has elected a leader.
JSAdvisoryConsumerLeaderElectedPre = "$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED"
// JSAdvisoryConsumerQuorumLostPre notification that a consumer is stalled.
JSAdvisoryConsumerQuorumLostPre = "$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST"
// JSAdvisoryServerOutOfStorage notification that a server has no more storage.
JSAdvisoryServerOutOfStorage = "$JS.EVENT.ADVISORY.SERVER.OUT_OF_STORAGE"
// JSAdvisoryServerRemoved notification that a server has been removed from the system.
JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"
// JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"
// JSAuditAdvisory is a notification about JetStream API access.
// FIXME - Add in details about who..
JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
)
var denyAllClientJs = []string{jsAllAPI, "$KV.>", "$OBJ.>"}
var denyAllJs = []string{jscAllSubj, raftAllSubj, jsAllAPI, "$KV.>", "$OBJ.>"}
func generateJSMappingTable(domain string) map[string]string {
mappings := map[string]string{}
// This set of mappings is very very very ugly.
// It is a consequence of what we defined the domain prefix to be "$JS.domain.API" and it's mapping to "$JS.API"
// For optics $KV and $OBJ where made to be independent subject spaces.
// As materialized views of JS, they did not simply extend that subject space to say "$JS.API.KV" "$JS.API.OBJ"
// This is very unfortunate!!!
// Furthermore, it seemed bad to require different domain prefixes for JS/KV/OBJ.
// Especially since the actual API for say KV, does use stream create from JS.
// To avoid overlaps KV and OBJ views append the prefix to their API.
// (Replacing $KV with the prefix allows users to create collisions with say the bucket name)
// This mapping therefore needs to have extra token so that the mapping can properly discern between $JS, $KV, $OBJ
for srcMappingSuffix, to := range map[string]string{
"INFO": JSApiAccountInfo,
"STREAM.>": "$JS.API.STREAM.>",
"CONSUMER.>": "$JS.API.CONSUMER.>",
"DIRECT.>": "$JS.API.DIRECT.>",
"META.>": "$JS.API.META.>",
"SERVER.>": "$JS.API.SERVER.>",
"ACCOUNT.>": "$JS.API.ACCOUNT.>",
"$KV.>": "$KV.>",
"$OBJ.>": "$OBJ.>",
} {
mappings[fmt.Sprintf("$JS.%s.API.%s", domain, srcMappingSuffix)] = to
}
return mappings
}
// JSMaxDescription is the maximum description length for streams and consumers.
const JSMaxDescriptionLen = 4 * 1024
// JSMaxMetadataLen is the maximum length for streams and consumers metadata map.
// It's calculated by summing length of all keys and values.
const JSMaxMetadataLen = 128 * 1024
// JSMaxNameLen is the maximum name lengths for streams, consumers and templates.
// Picked 255 as it seems to be a widely used file name limit
const JSMaxNameLen = 255
// JSDefaultRequestQueueLimit is the default number of entries that we will
// put on the global request queue before we react.
const JSDefaultRequestQueueLimit = 10_000
// Responses for API calls.
// ApiResponse is a standard response from the JetStream JSON API
type ApiResponse struct {
Type string `json:"type"`
Error *ApiError `json:"error,omitempty"`
}
const JSApiSystemResponseType = "io.nats.jetstream.api.v1.system_response"
// When passing back to the clients generalize store failures.
var (
errStreamStoreFailed = errors.New("error creating store for stream")
errConsumerStoreFailed = errors.New("error creating store for consumer")
)
// ToError checks if the response has a error and if it does converts it to an error avoiding
// the pitfalls described by https://yourbasic.org/golang/gotcha-why-nil-error-not-equal-nil/
func (r *ApiResponse) ToError() error {
if r.Error == nil {
return nil
}
return r.Error
}
const JSApiOverloadedType = "io.nats.jetstream.api.v1.system_overloaded"
// ApiPaged includes variables used to create paged responses from the JSON API
type ApiPaged struct {
Total int `json:"total"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
// ApiPagedRequest includes parameters allowing specific pages to be requests from APIs responding with ApiPaged
type ApiPagedRequest struct {
Offset int `json:"offset"`
}
// JSApiAccountInfoResponse reports back information on jetstream for this account.
type JSApiAccountInfoResponse struct {
ApiResponse
*JetStreamAccountStats
}
const JSApiAccountInfoResponseType = "io.nats.jetstream.api.v1.account_info_response"
// JSApiStreamCreateResponse stream creation.
type JSApiStreamCreateResponse struct {
ApiResponse
*StreamInfo
DidCreate bool `json:"did_create,omitempty"`
}
const JSApiStreamCreateResponseType = "io.nats.jetstream.api.v1.stream_create_response"
// JSApiStreamDeleteResponse stream removal.
type JSApiStreamDeleteResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"
// JSMaxSubjectDetails The limit of the number of subject details we will send in a stream info response.
const JSMaxSubjectDetails = 100_000
type JSApiStreamInfoRequest struct {
ApiPagedRequest
DeletedDetails bool `json:"deleted_details,omitempty"`
SubjectsFilter string `json:"subjects_filter,omitempty"`
}
type JSApiStreamInfoResponse struct {
ApiResponse
ApiPaged
*StreamInfo
}
const JSApiStreamInfoResponseType = "io.nats.jetstream.api.v1.stream_info_response"
// JSApiNamesLimit is the maximum entries we will return for streams or consumers lists.
// TODO(dlc) - with header or request support could request chunked response.
const JSApiNamesLimit = 1024
const JSApiListLimit = 256
type JSApiStreamNamesRequest struct {
ApiPagedRequest
// These are filters that can be applied to the list.
Subject string `json:"subject,omitempty"`
}
// JSApiStreamNamesResponse list of streams.
// A nil request is valid and means all streams.
type JSApiStreamNamesResponse struct {
ApiResponse
ApiPaged
Streams []string `json:"streams"`
}
const JSApiStreamNamesResponseType = "io.nats.jetstream.api.v1.stream_names_response"
type JSApiStreamListRequest struct {
ApiPagedRequest
// These are filters that can be applied to the list.
Subject string `json:"subject,omitempty"`
}
// JSApiStreamListResponse list of detailed stream information.
// A nil request is valid and means all streams.
type JSApiStreamListResponse struct {
ApiResponse
ApiPaged
Streams []*StreamInfo `json:"streams"`
Missing []string `json:"missing,omitempty"`
}
const JSApiStreamListResponseType = "io.nats.jetstream.api.v1.stream_list_response"
// JSApiStreamPurgeRequest is optional request information to the purge API.
// Subject will filter the purge request to only messages that match the subject, which can have wildcards.
// Sequence will purge up to but not including this sequence and can be combined with subject filtering.
// Keep will specify how many messages to keep. This can also be combined with subject filtering.
// Note that Sequence and Keep are mutually exclusive, so both can not be set at the same time.
type JSApiStreamPurgeRequest struct {
// Purge up to but not including sequence.
Sequence uint64 `json:"seq,omitempty"`
// Subject to match against messages for the purge command.
Subject string `json:"filter,omitempty"`
// Number of messages to keep.
Keep uint64 `json:"keep,omitempty"`
}
type JSApiStreamPurgeResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
Purged uint64 `json:"purged"`
}
const JSApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"
// JSApiStreamUpdateResponse for updating a stream.
type JSApiStreamUpdateResponse struct {
ApiResponse
*StreamInfo
}
const JSApiStreamUpdateResponseType = "io.nats.jetstream.api.v1.stream_update_response"
// JSApiMsgDeleteRequest delete message request.
type JSApiMsgDeleteRequest struct {
Seq uint64 `json:"seq"`
NoErase bool `json:"no_erase,omitempty"`
}
type JSApiMsgDeleteResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiMsgDeleteResponseType = "io.nats.jetstream.api.v1.stream_msg_delete_response"
type JSApiStreamSnapshotRequest struct {
// Subject to deliver the chunks to for the snapshot.
DeliverSubject string `json:"deliver_subject"`
// Do not include consumers in the snapshot.
NoConsumers bool `json:"no_consumers,omitempty"`
// Optional chunk size preference.
// Best to just let server select.
ChunkSize int `json:"chunk_size,omitempty"`
// Check all message's checksums prior to snapshot.
CheckMsgs bool `json:"jsck,omitempty"`
}
// JSApiStreamSnapshotResponse is the direct response to the snapshot request.
type JSApiStreamSnapshotResponse struct {
ApiResponse
// Configuration of the given stream.
Config *StreamConfig `json:"config,omitempty"`
// Current State for the given stream.
State *StreamState `json:"state,omitempty"`
}
const JSApiStreamSnapshotResponseType = "io.nats.jetstream.api.v1.stream_snapshot_response"
// JSApiStreamRestoreRequest is the required restore request.
type JSApiStreamRestoreRequest struct {
// Configuration of the given stream.
Config StreamConfig `json:"config"`
// Current State for the given stream.
State StreamState `json:"state"`
}
// JSApiStreamRestoreResponse is the direct response to the restore request.
type JSApiStreamRestoreResponse struct {
ApiResponse
// Subject to deliver the chunks to for the snapshot restore.
DeliverSubject string `json:"deliver_subject"`
}
const JSApiStreamRestoreResponseType = "io.nats.jetstream.api.v1.stream_restore_response"
// JSApiStreamRemovePeerRequest is the required remove peer request.
type JSApiStreamRemovePeerRequest struct {
// Server name of the peer to be removed.
Peer string `json:"peer"`
}
// JSApiStreamRemovePeerResponse is the response to a remove peer request.
type JSApiStreamRemovePeerResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"
// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
type JSApiStreamLeaderStepDownResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiStreamLeaderStepDownResponseType = "io.nats.jetstream.api.v1.stream_leader_stepdown_response"
// JSApiConsumerLeaderStepDownResponse is the response to a consumer leader stepdown request.
type JSApiConsumerLeaderStepDownResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiConsumerLeaderStepDownResponseType = "io.nats.jetstream.api.v1.consumer_leader_stepdown_response"
// JSApiLeaderStepdownRequest allows placement control over the meta leader placement.
type JSApiLeaderStepdownRequest struct {
Placement *Placement `json:"placement,omitempty"`
}
// JSApiLeaderStepDownResponse is the response to a meta leader stepdown request.
type JSApiLeaderStepDownResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response"
// JSApiMetaServerRemoveRequest will remove a peer from the meta group.
type JSApiMetaServerRemoveRequest struct {
// Server name of the peer to be removed.
Server string `json:"peer"`
// Peer ID of the peer to be removed. If specified this is used
// instead of the server name.
Peer string `json:"peer_id,omitempty"`
}
// JSApiMetaServerRemoveResponse is the response to a peer removal request in the meta group.
type JSApiMetaServerRemoveResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"
// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
type JSApiMetaServerStreamMoveRequest struct {
// Server name of the peer to be evacuated.
Server string `json:"server,omitempty"`
// Cluster the server is in
Cluster string `json:"cluster,omitempty"`
// Domain the sever is in
Domain string `json:"domain,omitempty"`
// Ephemeral placement tags for the move
Tags []string `json:"tags,omitempty"`
}
const JSApiAccountPurgeResponseType = "io.nats.jetstream.api.v1.account_purge_response"
// JSApiAccountPurgeResponse is the response to a purge request in the meta group.
type JSApiAccountPurgeResponse struct {
ApiResponse
Initiated bool `json:"initiated,omitempty"`
}
// JSApiMsgGetRequest get a message request.
type JSApiMsgGetRequest struct {
Seq uint64 `json:"seq,omitempty"`
LastFor string `json:"last_by_subj,omitempty"`
NextFor string `json:"next_by_subj,omitempty"`
}
type JSApiMsgGetResponse struct {
ApiResponse
Message *StoredMsg `json:"message,omitempty"`
}
const JSApiMsgGetResponseType = "io.nats.jetstream.api.v1.stream_msg_get_response"
// JSWaitQueueDefaultMax is the default max number of outstanding requests for pull consumers.
const JSWaitQueueDefaultMax = 512
type JSApiConsumerCreateResponse struct {
ApiResponse
*ConsumerInfo
}
const JSApiConsumerCreateResponseType = "io.nats.jetstream.api.v1.consumer_create_response"
type JSApiConsumerDeleteResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiConsumerDeleteResponseType = "io.nats.jetstream.api.v1.consumer_delete_response"
type JSApiConsumerInfoResponse struct {
ApiResponse
*ConsumerInfo
}
const JSApiConsumerInfoResponseType = "io.nats.jetstream.api.v1.consumer_info_response"
type JSApiConsumersRequest struct {
ApiPagedRequest
}
type JSApiConsumerNamesResponse struct {
ApiResponse
ApiPaged
Consumers []string `json:"consumers"`
}
const JSApiConsumerNamesResponseType = "io.nats.jetstream.api.v1.consumer_names_response"
type JSApiConsumerListResponse struct {
ApiResponse
ApiPaged
Consumers []*ConsumerInfo `json:"consumers"`
Missing []string `json:"missing,omitempty"`
}
const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_response"
// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers.
type JSApiConsumerGetNextRequest struct {
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
}
// JSApiStreamTemplateCreateResponse for creating templates.
type JSApiStreamTemplateCreateResponse struct {
ApiResponse
*StreamTemplateInfo
}
const JSApiStreamTemplateCreateResponseType = "io.nats.jetstream.api.v1.stream_template_create_response"
type JSApiStreamTemplateDeleteResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}
const JSApiStreamTemplateDeleteResponseType = "io.nats.jetstream.api.v1.stream_template_delete_response"
// JSApiStreamTemplateInfoResponse for information about stream templates.
type JSApiStreamTemplateInfoResponse struct {
ApiResponse
*StreamTemplateInfo
}
const JSApiStreamTemplateInfoResponseType = "io.nats.jetstream.api.v1.stream_template_info_response"
type JSApiStreamTemplatesRequest struct {
ApiPagedRequest
}
// JSApiStreamTemplateNamesResponse list of templates
type JSApiStreamTemplateNamesResponse struct {
ApiResponse
ApiPaged
Templates []string `json:"streams"`
}
const JSApiStreamTemplateNamesResponseType = "io.nats.jetstream.api.v1.stream_template_names_response"
// Structure that holds state for a JetStream API request that is processed
// in a separate long-lived go routine. This is to avoid possibly blocking
// ROUTE and GATEWAY connections.
type jsAPIRoutedReq struct {
jsub *subscription
sub *subscription
acc *Account
subject string
reply string
msg []byte
pa pubArg
}
func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
// Ignore system level directives meta stepdown and peer remove requests here.
if subject == JSApiLeaderStepDown ||
subject == JSApiRemoveServer ||
strings.HasPrefix(subject, jsAPIAccountPre) {
return
}
// No lock needed, those are immutable.
s, rr := js.srv, js.apiSubs.Match(subject)
hdr, msg := c.msgParts(rmsg)
if len(sliceHeader(ClientInfoHdr, hdr)) == 0 {
// Check if this is the system account. We will let these through for the account info only.
sacc := s.SystemAccount()
if sacc != acc {
return
}
if subject != JSApiAccountInfo {
// Only respond from the initial server entry to the NATS system.
if c.kind == CLIENT || c.kind == LEAF {
var resp = ApiResponse{
Type: JSApiSystemResponseType,
Error: NewJSNotEnabledForAccountError(),
}
s.sendAPIErrResponse(nil, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}
}
// Short circuit for no interest.
if len(rr.psubs)+len(rr.qsubs) == 0 {
if (c.kind == CLIENT || c.kind == LEAF) && acc != s.SystemAccount() {
ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
var resp = ApiResponse{
Type: JSApiSystemResponseType,
Error: NewJSBadRequestError(),
}
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}
// We should only have psubs and only 1 per result.
if len(rr.psubs) != 1 {
s.Warnf("Malformed JetStream API Request: [%s] %q", subject, rmsg)
if c.kind == CLIENT || c.kind == LEAF {
ci, acc, _, _, _ := s.getRequestInfo(c, rmsg)
var resp = ApiResponse{
Type: JSApiSystemResponseType,
Error: NewJSBadRequestError(),
}
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}
jsub := rr.psubs[0]
// If this is directly from a client connection ok to do in place.
if c.kind != ROUTER && c.kind != GATEWAY && c.kind != LEAF {
start := time.Now()
jsub.icb(sub, c, acc, subject, reply, rmsg)
if dur := time.Since(start); dur >= readLoopReportThreshold {
s.Warnf("Internal subscription on %q took too long: %v", subject, dur)
}
return
}
// If we are here we have received this request over a non-client connection.
// We need to make sure not to block. We will send the request to a long-lived
// pool of go routines.
// Increment inflight. Do this before queueing.
atomic.AddInt64(&js.apiInflight, 1)
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
// header from the msg body. No other references are needed.
// Check pending and warn if getting backed up.
pending := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
limit := atomic.LoadInt64(&js.queueLimit)
if pending >= int(limit) {
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
drained := int64(s.jsAPIRoutedReqs.drain())
atomic.AddInt64(&js.apiInflight, -drained)
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
TypedEvent: TypedEvent{
Type: JSAPILimitReachedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Server: s.Name(),
Domain: js.config.Domain,
Dropped: drained,
})
}
}
func (s *Server) processJSAPIRoutedRequests() {
defer s.grWG.Done()
s.mu.RLock()
queue := s.jsAPIRoutedReqs
client := &client{srv: s, kind: JETSTREAM}
s.mu.RUnlock()
js := s.getJetStream()
for {
select {
case <-queue.ch:
// Only pop one item at a time here, otherwise if the system is recovering
// from queue buildup, then one worker will pull off all the tasks and the
// others will be starved of work.
for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
client.pa = r.pa
start := time.Now()
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
if dur := time.Since(start); dur >= readLoopReportThreshold {
s.Warnf("Internal subscription on %q took too long: %v", r.subject, dur)
}
atomic.AddInt64(&js.apiInflight, -1)
}
case <-s.quitCh:
return
}
}
}
func (s *Server) setJetStreamExportSubs() error {
js := s.getJetStream()
if js == nil {
return NewJSNotEnabledError()
}
// Start the go routine that will process API requests received by the
// subscription below when they are coming from routes, etc..
const maxProcs = 16
mp := runtime.GOMAXPROCS(0)
// Cap at 16 max for now on larger core setups.
if mp > maxProcs {
mp = maxProcs
}
s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests")
for i := 0; i < mp; i++ {
s.startGoRoutine(s.processJSAPIRoutedRequests)
}
// This is the catch all now for all JetStream API calls.
if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil {
return err
}
if err := s.SystemAccount().AddServiceExport(jsAllAPI, nil); err != nil {
s.Warnf("Error setting up jetstream service exports: %v", err)
return err
}
// API handles themselves.
pairs := []struct {
subject string
handler msgHandler
}{
{JSApiAccountInfo, s.jsAccountInfoRequest},
{JSApiTemplateCreate, s.jsTemplateCreateRequest},
{JSApiTemplates, s.jsTemplateNamesRequest},
{JSApiTemplateInfo, s.jsTemplateInfoRequest},
{JSApiTemplateDelete, s.jsTemplateDeleteRequest},
{JSApiStreamCreate, s.jsStreamCreateRequest},
{JSApiStreamUpdate, s.jsStreamUpdateRequest},
{JSApiStreams, s.jsStreamNamesRequest},
{JSApiStreamList, s.jsStreamListRequest},
{JSApiStreamInfo, s.jsStreamInfoRequest},
{JSApiStreamDelete, s.jsStreamDeleteRequest},
{JSApiStreamPurge, s.jsStreamPurgeRequest},
{JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
{JSApiStreamRestore, s.jsStreamRestoreRequest},
{JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
{JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
{JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
{JSApiMsgDelete, s.jsMsgDeleteRequest},
{JSApiMsgGet, s.jsMsgGetRequest},
{JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
{JSApiConsumerCreate, s.jsConsumerCreateRequest},
{JSApiDurableCreate, s.jsConsumerCreateRequest},
{JSApiConsumers, s.jsConsumerNamesRequest},
{JSApiConsumerList, s.jsConsumerListRequest},
{JSApiConsumerInfo, s.jsConsumerInfoRequest},
{JSApiConsumerDelete, s.jsConsumerDeleteRequest},
}
js.mu.Lock()
defer js.mu.Unlock()
for _, p := range pairs {
sub := &subscription{subject: []byte(p.subject), icb: p.handler}
if err := js.apiSubs.Insert(sub); err != nil {
return err
}
}
return nil
}
func (s *Server) sendAPIResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
acc.trackAPI()
if reply != _EMPTY_ {
s.sendInternalAccountMsg(nil, reply, response)
}
s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
}
func (s *Server) sendAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string) {
acc.trackAPIErr()
if reply != _EMPTY_ {
s.sendInternalAccountMsg(nil, reply, response)
}
s.sendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response)
}
const errRespDelay = 500 * time.Millisecond
func (s *Server) sendDelayedAPIErrResponse(ci *ClientInfo, acc *Account, subject, reply, request, response string, rg *raftGroup) {
js := s.getJetStream()
if js == nil {
return
}
var quitCh <-chan struct{}
js.mu.RLock()
if rg != nil && rg.node != nil {
quitCh = rg.node.QuitC()
}
js.mu.RUnlock()
s.startGoRoutine(func() {
defer s.grWG.Done()
select {
case <-quitCh:
case <-s.quitCh:
case <-time.After(errRespDelay):
acc.trackAPIErr()
if reply != _EMPTY_ {
s.sendInternalAccountMsg(nil, reply, response)
}