5858 is_v4 /0 ,
5959
6060 % % misc
61- dehydrate_state /1 ,
6261 normalize /1 ,
6362 get_msg_header /1 ,
6463 get_header /2 ,
@@ -297,10 +296,9 @@ apply(#{index := Idx} = Meta,
297296 messages = lqueue :in (? MSG (Idx , Header ), Messages ),
298297 enqueue_count = EnqCount + 1 },
299298 State2 = update_or_remove_con (Meta , ConsumerKey , Con , State1 ),
300- {State , Ret , Effs } = checkout (Meta , State0 , State2 , []),
301- update_smallest_raft_index (Idx , Ret ,
302- maybe_store_release_cursor (Idx , State ),
303- Effs );
299+ {State3 , Ret , Effs0 } = checkout (Meta , State0 , State2 , []),
300+ {State , Effs } = maybe_checkpoint (Idx , State3 , Effs0 ),
301+ update_smallest_raft_index (Idx , Ret , State , Effs );
304302 _ ->
305303 {State00 , ok , []}
306304 end ;
@@ -1556,8 +1554,9 @@ apply_enqueue(#{index := RaftIdx,
15561554 system_time := Ts } = Meta , From , Seq , RawMsg , State0 ) ->
15571555 case maybe_enqueue (RaftIdx , Ts , From , Seq , RawMsg , [], State0 ) of
15581556 {ok , State1 , Effects1 } ->
1559- {State , ok , Effects } = checkout (Meta , State0 , State1 , Effects1 ),
1560- {maybe_store_release_cursor (RaftIdx , State ), ok , Effects };
1557+ {State2 , ok , Effects2 } = checkout (Meta , State0 , State1 , Effects1 ),
1558+ {State , Effects } = maybe_checkpoint (RaftIdx , State2 , Effects2 ),
1559+ {State , ok , Effects };
15611560 {out_of_sequence , State , Effects } ->
15621561 {State , not_enqueued , Effects };
15631562 {duplicate , State , Effects } ->
@@ -1619,16 +1618,15 @@ update_expiry_header(RaCmdTs, TTL, Header) ->
16191618update_expiry_header (ExpiryTs , Header ) ->
16201619 update_header (expiry , fun (Ts ) -> Ts end , ExpiryTs , Header ).
16211620
1622- maybe_store_release_cursor (RaftIdx ,
1623- #? STATE {cfg = # cfg {release_cursor_interval =
1624- {Base , C }} = Cfg ,
1625- enqueue_count = EC ,
1626- release_cursors = Cursors0 } = State0 )
1621+ maybe_checkpoint (RaftIdx ,
1622+ #? STATE {cfg = # cfg {release_cursor_interval = {Base , C }} = Cfg ,
1623+ enqueue_count = EC } = State0 ,
1624+ Effects0 )
16271625 when EC >= C ->
16281626 case messages_total (State0 ) of
16291627 0 ->
16301628 % % message must have been immediately dropped
1631- State0 #? STATE {enqueue_count = 0 };
1629+ { State0 #? STATE {enqueue_count = 0 }, Effects0 };
16321630 Total ->
16331631 Interval = case Base of
16341632 0 -> 0 ;
@@ -1637,14 +1635,11 @@ maybe_store_release_cursor(RaftIdx,
16371635 end ,
16381636 State = State0 #? STATE {cfg = Cfg # cfg {release_cursor_interval =
16391637 {Base , Interval }}},
1640- Dehydrated = dehydrate_state (State ),
1641- Cursor = {release_cursor , RaftIdx , Dehydrated },
1642- Cursors = lqueue :in (Cursor , Cursors0 ),
1643- State #? STATE {enqueue_count = 0 ,
1644- release_cursors = Cursors }
1638+ Effects = Effects0 ++ [{checkpoint , RaftIdx , State }],
1639+ {State #? STATE {enqueue_count = 0 }, Effects }
16451640 end ;
1646- maybe_store_release_cursor (_RaftIdx , State ) ->
1647- State .
1641+ maybe_checkpoint (_RaftIdx , State , Effects ) ->
1642+ { State , Effects } .
16481643
16491644maybe_enqueue (RaftIdx , Ts , undefined , undefined , RawMsg , Effects ,
16501645 #? STATE {msg_bytes_enqueue = Enqueue ,
@@ -1803,9 +1798,8 @@ update_smallest_raft_index(Idx, State, Effects) ->
18031798 update_smallest_raft_index (Idx , ok , State , Effects ).
18041799
18051800update_smallest_raft_index (IncomingRaftIdx , Reply ,
1806- #? STATE {cfg = Cfg ,
1807- release_cursors = Cursors0 } = State0 ,
1808- Effects ) ->
1801+ #? STATE {cfg = Cfg } = State0 ,
1802+ Effects0 ) ->
18091803 Total = messages_total (State0 ),
18101804 % % TODO: optimise
18111805 case smallest_raft_index (State0 ) of
@@ -1817,35 +1811,17 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
18171811 # cfg {release_cursor_interval = {Base , _ }} = Cfg ,
18181812 RCI = {Base , Base },
18191813 State = State0 #? STATE {cfg = Cfg # cfg {release_cursor_interval = RCI },
1820- release_cursors = lqueue :new (),
18211814 enqueue_count = 0 },
1822- {State , Reply , Effects ++ [{release_cursor , IncomingRaftIdx , State }]};
1815+ Effects = Effects0 ++ [{release_cursor , IncomingRaftIdx , State }],
1816+ {State , Reply , Effects };
18231817 undefined ->
1824- {State0 , Reply , Effects };
1818+ {State0 , Reply , Effects0 };
18251819 Smallest when is_integer (Smallest ) ->
1826- case find_next_cursor (Smallest , Cursors0 ) of
1827- empty ->
1828- {State0 , Reply , Effects };
1829- {Cursor , Cursors } ->
1830- % % we can emit a release cursor when we've passed the smallest
1831- % % release cursor available.
1832- {State0 #? STATE {release_cursors = Cursors }, Reply ,
1833- Effects ++ [Cursor ]}
1834- end
1835- end .
1836-
1837- find_next_cursor (Idx , Cursors ) ->
1838- find_next_cursor (Idx , Cursors , empty ).
1839-
1840- find_next_cursor (Smallest , Cursors0 , Potential ) ->
1841- case lqueue :out (Cursors0 ) of
1842- {{value , {_ , Idx , _ } = Cursor }, Cursors } when Idx < Smallest ->
1843- % % we found one but it may not be the largest one
1844- find_next_cursor (Smallest , Cursors , Cursor );
1845- _ when Potential == empty ->
1846- empty ;
1847- _ ->
1848- {Potential , Cursors0 }
1820+ % % Promote a checkpoint smaller than `Smallest'. `Smallest' is the
1821+ % % oldest message that must remain in the log, so we can only
1822+ % % promote an existing checkpoint smaller than `Smallest'.
1823+ Effects = Effects0 ++ [{release_cursor , Smallest - 1 }],
1824+ {State0 , Reply , Effects }
18491825 end .
18501826
18511827update_msg_header (Key , Fun , Def , ? MSG (Idx , Header )) ->
@@ -2373,17 +2349,6 @@ included_credit({credited, _}) ->
23732349 0 ;
23742350included_credit (credited ) ->
23752351 0 .
2376- % % creates a dehydrated version of the current state to be cached and
2377- % % potentially used to for a snaphot at a later point
2378- dehydrate_state (#? STATE {cfg = # cfg {},
2379- dlx = DlxState } = State ) ->
2380- % no messages are kept in memory, no need to
2381- % overly mutate the current state apart from removing indexes and cursors
2382- State #? STATE {ra_indexes = rabbit_fifo_index :empty (),
2383- release_cursors = lqueue :new (),
2384- enqueue_count = 0 ,
2385- msg_cache = undefined ,
2386- dlx = rabbit_fifo_dlx :dehydrate (DlxState )}.
23872352
23882353% % make the state suitable for equality comparison
23892354normalize (#? STATE {ra_indexes = _Indexes ,
0 commit comments