@@ -813,11 +813,9 @@ purge_node(Meta, Node, State, Effects) ->
813813 end , {State , Effects }, all_pids_for (Node , State )).
814814
815815% % any downs that re not noconnection
816- handle_down (Meta , Pid , #? STATE {consumers = Cons0 ,
817- enqueuers = Enqs0 } = State0 ) ->
818- % Remove any enqueuer for the down pid
819- State1 = State0 #? STATE {enqueuers = maps :remove (Pid , Enqs0 )},
820- {Effects1 , State2 } = handle_waiting_consumer_down (Pid , State1 ),
816+ handle_down (Meta , Pid , #? MODULE {consumers = Cons0 } = State0 ) ->
817+ {Effects0 , State1 } = handle_enqueuer_down (Meta , Pid , State0 ),
818+ {Effects1 , State2 } = handle_waiting_consumer_down (Pid , State1 , Effects0 ),
821819 % return checked out messages to main queue
822820 % Find the consumers for the down pid
823821 DownConsumers = maps :keys (
@@ -826,6 +824,23 @@ handle_down(Meta, Pid, #?STATE{consumers = Cons0,
826824 cancel_consumer (Meta , ConsumerId , S , E , down )
827825 end , {State2 , Effects1 }, DownConsumers ).
828826
827+ handle_enqueuer_down (#{index := Idx }, Pid ,
828+ #? MODULE {enqueuers = Enqs0 } = State0 ) ->
829+ case maps :take (Pid , Enqs0 ) of
830+ {_Enqueuer , Enqs } ->
831+ State = State0 #? MODULE {enqueuers = Enqs },
832+ % % When there are no more enqueuers connected, suggest a checkpoint
833+ % % so that recovery is fast.
834+ case Enqs =:= #{} of
835+ true ->
836+ {[{checkpoint , Idx , State }], State };
837+ false ->
838+ {[], State }
839+ end ;
840+ error ->
841+ {[], State0 }
842+ end .
843+
829844consumer_active_flag_update_function (
830845 #? STATE {cfg = # cfg {consumer_strategy = competing }}) ->
831846 fun (State , ConsumerId , Consumer , Active , ActivityStatus , Effects ) ->
@@ -839,22 +854,25 @@ consumer_active_flag_update_function(
839854 end .
840855
841856handle_waiting_consumer_down (_Pid ,
842- #? STATE {cfg = # cfg {consumer_strategy = competing }} = State ) ->
843- {[], State };
857+ #? MODULE {cfg = # cfg {consumer_strategy = competing }} = State ,
858+ Effects0 ) ->
859+ {Effects0 , State };
844860handle_waiting_consumer_down (_Pid ,
845- #? STATE {cfg = # cfg {consumer_strategy = single_active },
846- waiting_consumers = []} = State ) ->
847- {[], State };
861+ #? MODULE {cfg = # cfg {consumer_strategy = single_active },
862+ waiting_consumers = []} = State ,
863+ Effects0 ) ->
864+ {Effects0 , State };
848865handle_waiting_consumer_down (Pid ,
849- #? STATE {cfg = # cfg {consumer_strategy = single_active },
850- waiting_consumers = WaitingConsumers0 } = State0 ) ->
866+ #? MODULE {cfg = # cfg {consumer_strategy = single_active },
867+ waiting_consumers = WaitingConsumers0 } = State0 ,
868+ Effects0 ) ->
851869 % get cancel effects for down waiting consumers
852870 Down = lists :filter (fun ({{_ , P }, _ }) -> P =:= Pid end ,
853871 WaitingConsumers0 ),
854872 Effects = lists :foldl (fun ({ConsumerId , _ }, Effects ) ->
855873 cancel_consumer_effects (ConsumerId , State0 ,
856874 Effects )
857- end , [] , Down ),
875+ end , Effects0 , Down ),
858876 % update state to have only up waiting consumers
859877 StillUp = lists :filter (fun ({{_ , P }, _ }) -> P =/= Pid end ,
860878 WaitingConsumers0 ),
0 commit comments