3838-define (SOFT_LIMIT , 32 ).
3939-define (TIMER_TIME , 10000 ).
4040-define (COMMAND_TIMEOUT , 30000 ).
41+ -define (UNLIMITED_PREFETCH_COUNT , 2000 ). % % something large for ra
4142
4243-type seq () :: non_neg_integer ().
4344
@@ -118,6 +119,9 @@ enqueue(QName, Correlation, Msg,
118119 cfg = # cfg {servers = Servers ,
119120 timeout = Timeout }} = State0 ) ->
120121 % % the first publish, register and enqueuer for this process.
122+ % % TODO: we _only_ need to pre-register an enqueuer to discover if the
123+ % % queue overflow is `reject_publish` and the queue can accept new messages
124+ % % if the queue does not have `reject_publish` set we can skip this step
121125 Reg = rabbit_fifo :make_register_enqueuer (self ()),
122126 case ra :process_command (Servers , Reg , Timeout ) of
123127 {ok , reject_publish , Leader } ->
@@ -335,19 +339,32 @@ discard(ConsumerTag, [_|_] = MsgIds,
335339 state ()) ->
336340 {ok , ConsumerInfos :: map (), state ()} |
337341 {error | timeout , term ()}.
338- checkout (ConsumerTag , CreditMode , Meta ,
342+ checkout (ConsumerTag , CreditMode , #{} = Meta ,
339343 # state {consumers = CDels0 } = State0 )
340- when is_binary (ConsumerTag ) ->
344+ when is_binary (ConsumerTag ) andalso
345+ is_tuple (CreditMode ) ->
341346 Servers = sorted_servers (State0 ),
342347 ConsumerId = consumer_id (ConsumerTag ),
343- NumUnsettled = case CreditMode of
344- credited -> 0 ;
348+ Spec = case rabbit_fifo :is_v4 () of
349+ true ->
350+ case CreditMode of
351+ {simple_prefetch , 0 } ->
352+ {auto , {simple_prefetch ,
353+ ? UNLIMITED_PREFETCH_COUNT }};
354+ _ ->
355+ {auto , CreditMode }
356+ end ;
357+ false ->
358+ case CreditMode of
359+ {credited , _ } ->
360+ {auto , 0 , credited };
361+ {simple_prefetch , 0 } ->
362+ {auto , ? UNLIMITED_PREFETCH_COUNT , simple_prefetch };
345363 {simple_prefetch , Num } ->
346- Num
347- end ,
348- Cmd = rabbit_fifo :make_checkout (ConsumerId ,
349- {auto , NumUnsettled , CreditMode },
350- Meta ),
364+ {auto , Num , simple_prefetch }
365+ end
366+ end ,
367+ Cmd = rabbit_fifo :make_checkout (ConsumerId , Spec , Meta ),
351368 % % ???
352369 Ack = maps :get (ack , Meta , true ),
353370
@@ -369,7 +386,7 @@ checkout(ConsumerTag, CreditMode, Meta,
369386 NextMsgId - 1
370387 end
371388 end ,
372- DeliveryCount = case maps : is_key ( initial_delivery_count , Meta ) of
389+ DeliveryCount = case rabbit_fifo : is_v4 ( ) of
373390 true -> credit_api_v2 ;
374391 false -> {credit_api_v1 , 0 }
375392 end ,
0 commit comments