@@ -888,12 +888,14 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
888
888
// Check pending and warn if getting backed up.
889
889
limit := atomic .LoadInt64 (& js .queueLimit )
890
890
retry:
891
+ atomic .AddInt64 (& js .apiInflight , 1 )
891
892
pending , _ := s .jsAPIRoutedReqs .push (& jsAPIRoutedReq {jsub , sub , acc , subject , reply , copyBytes (rmsg ), c .pa })
892
893
if pending >= int (limit ) {
893
894
if _ , ok := s .jsAPIRoutedReqs .popOne (); ok {
894
895
// If we were able to take one of the oldest items off the queue, then
895
896
// retry the insert.
896
897
s .rateLimitFormatWarnf ("JetStream API queue limit reached, dropping oldest request" )
898
+ atomic .AddInt64 (& js .apiInflight , - 1 )
897
899
s .publishAdvisory (nil , JSAdvisoryAPILimitReached , JSAPILimitReachedAdvisory {
898
900
TypedEvent : TypedEvent {
899
901
Type : JSAPILimitReachedAdvisoryType ,
@@ -911,7 +913,7 @@ retry:
911
913
// then something is wrong for us to be both over the limit but unable to pull entries, so
912
914
// throw everything away and hope we recover from it.
913
915
s .rateLimitFormatWarnf ("JetStream API queue limit reached, dropping %d requests" , pending )
914
- s .jsAPIRoutedReqs .drain ()
916
+ atomic . AddInt64 ( & js . apiInflight , - int64 ( s .jsAPIRoutedReqs .drain ()) )
915
917
916
918
s .publishAdvisory (nil , JSAdvisoryAPILimitReached , JSAPILimitReachedAdvisory {
917
919
TypedEvent : TypedEvent {
@@ -923,8 +925,6 @@ retry:
923
925
Domain : js .config .Domain ,
924
926
Dropped : int64 (pending ),
925
927
})
926
- } else {
927
- atomic .StoreInt64 (& js .apiInflight , int64 (pending ))
928
928
}
929
929
}
930
930
0 commit comments