@@ -932,7 +932,7 @@ which_module(5) -> ?MODULE.
932932 smallest_index :: undefined | ra :index (),
933933 messages_total :: non_neg_integer (),
934934 indexes = ? CHECK_MIN_INDEXES :: non_neg_integer (),
935- unused_1 = ? NIL }).
935+ bytes_in = 0 :: non_neg_integer () }).
936936-record (aux_gc , {last_raft_idx = 0 :: ra :index ()}).
937937-record (aux , {name :: atom (),
938938 capacity :: term (),
@@ -943,7 +943,9 @@ which_module(5) -> ?MODULE.
943943 gc = # aux_gc {} :: # aux_gc {},
944944 tick_pid :: undefined | pid (),
945945 cache = #{} :: map (),
946- last_checkpoint :: # checkpoint {}}).
946+ last_checkpoint :: # checkpoint {},
947+ bytes_in = 0 :: non_neg_integer (),
948+ bytes_out = 0 :: non_neg_integer ()}).
947949
948950init_aux (Name ) when is_atom (Name ) ->
949951 % % TODO: catch specific exception throw if table already exists
@@ -956,7 +958,7 @@ init_aux(Name) when is_atom(Name) ->
956958 last_checkpoint = # checkpoint {index = 0 ,
957959 timestamp = erlang :system_time (millisecond ),
958960 messages_total = 0 ,
959- unused_1 = ? NIL }}.
961+ bytes_in = 0 }}.
960962
961963handle_aux (RaftState , Tag , Cmd , # aux {name = Name ,
962964 capacity = Cap ,
@@ -973,13 +975,14 @@ handle_aux(RaftState, Tag, Cmd, AuxV2, RaAux)
973975 handle_aux (RaftState , Tag , Cmd , AuxV3 , RaAux );
974976handle_aux (leader , cast , eval ,
975977 #? AUX {last_decorators_state = LastDec ,
978+ bytes_in = BytesIn ,
976979 last_checkpoint = Check0 } = Aux0 ,
977980 RaAux ) ->
978981 #? STATE {cfg = # cfg {resource = QName }} = MacState =
979982 ra_aux :machine_state (RaAux ),
980983
981984 Ts = erlang :system_time (millisecond ),
982- {Check , Effects0 } = do_checkpoints (Ts , Check0 , RaAux , false ),
985+ {Check , Effects0 } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , false ),
983986
984987 % % this is called after each batch of commands have been applied
985988 % % set timer for message expire
@@ -995,11 +998,16 @@ handle_aux(leader, cast, eval,
995998 last_decorators_state = NewLast }, RaAux , Effects }
996999 end ;
9971000handle_aux (_RaftState , cast , eval ,
998- #? AUX {last_checkpoint = Check0 } = Aux0 ,
1001+ #? AUX {last_checkpoint = Check0 ,
1002+ bytes_in = BytesIn } = Aux0 ,
9991003 RaAux ) ->
10001004 Ts = erlang :system_time (millisecond ),
1001- {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , false ),
1005+ {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , false ),
10021006 {no_reply , Aux0 #? AUX {last_checkpoint = Check }, RaAux , Effects };
1007+ handle_aux (_RaftState , cast , {bytes_in , {MetaSize , BodySize }},
1008+ #? AUX {bytes_in = Bytes } = Aux0 ,
1009+ RaAux ) ->
1010+ {no_reply , Aux0 #? AUX {bytes_in = Bytes + MetaSize + BodySize }, RaAux , []};
10031011handle_aux (_RaftState , cast , {# return {msg_ids = MsgIds ,
10041012 consumer_key = Key } = Ret , Corr , Pid },
10051013 Aux0 , RaAux0 ) ->
@@ -1129,12 +1137,13 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
11291137handle_aux (_ , _ , garbage_collection , Aux , RaAux ) ->
11301138 {no_reply , force_eval_gc (RaAux , Aux ), RaAux };
11311139handle_aux (_RaState , _ , force_checkpoint ,
1132- #? AUX {last_checkpoint = Check0 } = Aux , RaAux ) ->
1140+ #? AUX {last_checkpoint = Check0 ,
1141+ bytes_in = BytesIn } = Aux , RaAux ) ->
11331142 Ts = erlang :system_time (millisecond ),
11341143 #? STATE {cfg = # cfg {resource = QR }} = ra_aux :machine_state (RaAux ),
11351144 rabbit_log :debug (" ~ts : rabbit_fifo: forcing checkpoint at ~b " ,
11361145 [rabbit_misc :rs (QR ), ra_aux :last_applied (RaAux )]),
1137- {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , true ),
1146+ {Check , Effects } = do_checkpoints (Ts , Check0 , RaAux , BytesIn , true ),
11381147 {no_reply , Aux #? AUX {last_checkpoint = Check }, RaAux , Effects };
11391148handle_aux (RaState , _ , {dlx , _ } = Cmd , Aux0 , RaAux ) ->
11401149 #? STATE {dlx = DlxState ,
@@ -1578,7 +1587,9 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey,
15781587apply_enqueue (#{index := RaftIdx ,
15791588 system_time := Ts } = Meta , From ,
15801589 Seq , RawMsg , Size , State0 ) ->
1581- case maybe_enqueue (RaftIdx , Ts , From , Seq , RawMsg , Size , [], State0 ) of
1590+ Effects0 = [{aux , {bytes_in , Size }}],
1591+ case maybe_enqueue (RaftIdx , Ts , From , Seq , RawMsg , Size ,
1592+ Effects0 , State0 ) of
15821593 {ok , State1 , Effects1 } ->
15831594 checkout (Meta , State0 , State1 , Effects1 );
15841595 {out_of_sequence , State , Effects } ->
@@ -2918,11 +2929,12 @@ priority_tag(Msg) ->
29182929 end .
29192930
29202931
2921- do_checkpoints (Ts ,
2922- # checkpoint {index = ChIdx ,
2923- timestamp = ChTime ,
2924- smallest_index = LastSmallest ,
2925- indexes = MinIndexes } = Check0 , RaAux , Force ) ->
2932+ do_checkpoints (Ts , # checkpoint {index = ChIdx ,
2933+ timestamp = ChTime ,
2934+ smallest_index = LastSmallest ,
2935+ bytes_in = LastBytesIn ,
2936+ indexes = MinIndexes } = Check0 ,
2937+ RaAux , BytesIn , Force ) ->
29262938 LastAppliedIdx = ra_aux :last_applied (RaAux ),
29272939 IndexesSince = LastAppliedIdx - ChIdx ,
29282940 #? STATE {} = MacState = ra_aux :machine_state (RaAux ),
@@ -2934,21 +2946,35 @@ do_checkpoints(Ts,
29342946 Smallest
29352947 end ,
29362948 MsgsTot = messages_total (MacState ),
2949+ % % more than 64MB (by default) of message data has been written to the log
2950+ % % best take a checkpoint
2951+
29372952 {CheckMinInterval , CheckMinIndexes , CheckMaxIndexes } =
29382953 persistent_term :get (quorum_queue_checkpoint_config ,
29392954 {? CHECK_MIN_INTERVAL_MS , ? CHECK_MIN_INDEXES ,
29402955 ? CHECK_MAX_INDEXES }),
2956+
2957+ % % scale the bytes limit as the backlog increases
2958+ MaxBytesFactor = max (1 , MsgsTot / CheckMaxIndexes ),
2959+ EnoughDataWritten = BytesIn - LastBytesIn > (? CHECK_MAX_BYTES * MaxBytesFactor ),
29412960 EnoughTimeHasPassed = TimeSince > CheckMinInterval ,
29422961
2943- % % enough time has passed and enough indexes have been committed
2944- case (IndexesSince > MinIndexes andalso
2945- EnoughTimeHasPassed ) orelse
2946- % % the queue is empty and some commands have been
2947- % % applied since the last checkpoint
2948- (MsgsTot == 0 andalso
2949- IndexesSince > CheckMinIndexes andalso
2950- EnoughTimeHasPassed ) orelse
2951- Force of
2962+ case (EnoughTimeHasPassed andalso
2963+ (
2964+ % % condition 1: enough indexes have been committed since the last
2965+ % % checkpoint
2966+ (IndexesSince > MinIndexes ) orelse
2967+ % % condition 2: the queue is empty and _some_ commands
2968+ % % have been applied since the last checkpoint
2969+ (MsgsTot == 0 andalso IndexesSince > 32 )
2970+ )
2971+ ) orelse
2972+ % % condition 3: enough message data has been written to warrant a new
2973+ % % checkpoint, this ignores the time windowing
2974+ EnoughDataWritten orelse
2975+ % % force was requested, e.g. after a purge
2976+ Force
2977+ of
29522978 true ->
29532979 % % take fewer checkpoints the more messages there are on queue
29542980 NextIndexes = min (max (MsgsTot , CheckMinIndexes ), CheckMaxIndexes ),
@@ -2957,6 +2983,7 @@ do_checkpoints(Ts,
29572983 timestamp = Ts ,
29582984 smallest_index = NewSmallest ,
29592985 messages_total = MsgsTot ,
2986+ bytes_in = BytesIn ,
29602987 indexes = NextIndexes },
29612988 [{checkpoint , LastAppliedIdx , MacState } |
29622989 release_cursor (LastSmallest , NewSmallest )]};
0 commit comments