@@ -160,7 +160,7 @@ enqueue(_QName, _Correlation, _Msg,
160160 # state {queue_status = reject_publish ,
161161 cfg = # cfg {}} = State ) ->
162162 {reject_publish , State };
163- enqueue (QName , Correlation , Msg ,
163+ enqueue (_QName , Correlation , Msg ,
164164 # state {slow = WasSlow ,
165165 pending = Pending ,
166166 queue_status = go ,
@@ -176,8 +176,9 @@ enqueue(QName, Correlation, Msg,
176176 next_seq = Seq + 1 ,
177177 next_enqueue_seq = EnqueueSeq + 1 ,
178178 slow = IsSlow },
179+
179180 if IsSlow andalso not WasSlow ->
180- {ok , set_timer ( QName , State ) , [{block , cluster_name (State )}]};
181+ {ok , State , [{block , cluster_name (State )}]};
181182 true ->
182183 {ok , State , []}
183184 end .
@@ -632,10 +633,10 @@ handle_ra_event(QName, Leader, {applied, Seqs},
632633 when ActualLeader =/= OldLeader ->
633634 % % there is a new leader
634635 ? LOG_DEBUG (" ~ts : Detected QQ leader change (applied) "
635- " from ~w to ~w , "
636- " resending ~b pending commands" ,
637- [? MODULE , OldLeader , ActualLeader ,
638- maps :size (State1 # state .pending )]),
636+ " from ~w to ~w , "
637+ " resending ~b pending commands" ,
638+ [? MODULE , OldLeader , ActualLeader ,
639+ maps :size (State1 # state .pending )]),
639640 resend_all_pending (State1 # state {leader = ActualLeader });
640641 _ ->
641642 State1
@@ -702,9 +703,9 @@ handle_ra_event(QName, Leader, {machine, leader_change},
702703 % % we need to update leader
703704 % % and resend any pending commands
704705 ? LOG_DEBUG (" ~ts : ~s Detected QQ leader change from ~w to ~w , "
705- " resending ~b pending commands" ,
706- [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
707- Leader , maps :size (Pending )]),
706+ " resending ~b pending commands" ,
707+ [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
708+ Leader , maps :size (Pending )]),
708709 State = resend_all_pending (State0 # state {leader = Leader }),
709710 {ok , State , []};
710711handle_ra_event (_QName , _From , {rejected , {not_leader , Leader , _Seq }},
@@ -714,21 +715,27 @@ handle_ra_event(QName, _From, {rejected, {not_leader, Leader, _Seq}},
714715 # state {leader = OldLeader ,
715716 pending = Pending } = State0 ) ->
716717 ? LOG_DEBUG (" ~ts : ~s Detected QQ leader change (rejection) from ~w to ~w , "
717- " resending ~b pending commands" ,
718- [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
719- Leader , maps :size (Pending )]),
718+ " resending ~b pending commands" ,
719+ [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
720+ Leader , maps :size (Pending )]),
720721 State = resend_all_pending (State0 # state {leader = Leader }),
721722 {ok , cancel_timer (State ), []};
722723handle_ra_event (_QName , _From ,
723724 {rejected , {not_leader , _UndefinedMaybe , _Seq }}, State0 ) ->
724725 % TODO: how should these be handled? re-sent on timer or try random
725726 {ok , State0 , []};
726- handle_ra_event (QName , _ , timeout , # state {cfg = # cfg {servers = Servers }} = State0 ) ->
727+ handle_ra_event (QName , _ , timeout , # state {cfg = # cfg {servers = Servers },
728+ leader = OldLeader ,
729+ pending = Pending } = State0 ) ->
727730 case find_leader (Servers ) of
728731 undefined ->
729732 % % still no leader, set the timer again
730733 {ok , set_timer (QName , State0 ), []};
731734 Leader ->
735+ ? LOG_DEBUG (" ~ts : ~s Pending applied Timeout ~w to ~w , "
736+ " resending ~b pending commands" ,
737+ [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
738+ Leader , maps :size (Pending )]),
732739 State = resend_all_pending (State0 # state {leader = Leader }),
733740 {ok , State , []}
734741 end ;
@@ -743,7 +750,7 @@ handle_ra_event(QName, Leader, close_cached_segments,
743750 case now_ms () > Last + ? CACHE_SEG_TIMEOUT of
744751 true ->
745752 ? LOG_DEBUG (" ~ts : closing_cached_segments" ,
746- [rabbit_misc :rs (QName )]),
753+ [rabbit_misc :rs (QName )]),
747754 % % its been long enough, evict all
748755 _ = ra_flru :evict_all (Cache ),
749756 State # state {cached_segments = undefined };
@@ -804,12 +811,16 @@ seq_applied({Seq, Response},
804811 {Corrs , Actions0 , # state {} = State0 }) ->
805812 % % sequences aren't guaranteed to be applied in order as enqueues are
806813 % % low priority commands and may be overtaken by others with a normal priority.
814+ % %
815+ % % if the response is 'not_enqueued' we need to still keep the pending
816+ % % command for a later resend
807817 {Actions , State } = maybe_add_action (Response , Actions0 , State0 ),
808818 case maps :take (Seq , State # state .pending ) of
809- {{undefined , _ }, Pending } ->
819+ {{undefined , _ }, Pending }
820+ when Response =/= not_enqueued ->
810821 {Corrs , Actions , State # state {pending = Pending }};
811822 {{Corr , _ }, Pending }
812- when Response /= not_enqueued ->
823+ when Response = /= not_enqueued ->
813824 {[Corr | Corrs ], Actions , State # state {pending = Pending }};
814825 _ ->
815826 {Corrs , Actions , State }
0 commit comments