Skip to content

Commit 557b02d

Browse files
committed
QQ: add new consumer cancel option: 'remove'
This option immediately removes and returns all messages for a consumer instead of the softer 'cancel' option which keeps the consumer around until all pending messages have been either settled or returned. This involves a change to the rabbit_queue_type:cancel/5 API to rabbit_queue_type:cancel/3.
1 parent 203b671 commit 557b02d

14 files changed

+167
-52
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,8 +1136,9 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
11361136
QName = rabbit_misc:r(Vhost, queue, QNameBin),
11371137
case rabbit_amqqueue:lookup(QName) of
11381138
{ok, Q} ->
1139-
%%TODO Consider adding a new rabbit_queue_type:remove_consumer API that - from the point of view of
1140-
%% the queue process - behaves as if our session process terminated: All messages checked out
1139+
%% TODO: Consider adding a new rabbit_queue_type:remove_consumer
1140+
%% API that - from the point of view of the queue process -
1141+
%% behaves as if our session process terminated: All messages checked out
11411142
%% to this consumer should be re-queued automatically instead of us requeueing them here after cancelling
11421143
%% consumption.
11431144
%% For AMQP legacy (and STOMP / MQTT) consumer cancellation not requeueing messages is a good approach as
@@ -1149,7 +1150,9 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
11491150
%% first detaching and then re-attaching to the same session with the same link handle (the handle
11501151
%% becomes available for re-use once a link is closed): This will result in the same consumer tag,
11511152
%% and we ideally disallow "updating" an AMQP consumer.
1152-
case rabbit_queue_type:cancel(Q, Ctag, undefined, Username, QStates0) of
1153+
Spec = #{consumer_tag => Ctag,
1154+
user => Username},
1155+
case rabbit_queue_type:cancel(Q, Spec, QStates0) of
11531156
{ok, QStates1} ->
11541157
{Unsettled1, MsgIds} = remove_link_from_outgoing_unsettled_map(Ctag, Unsettled0),
11551158
case MsgIds of

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1772,8 +1772,10 @@ basic_consume(Q, NoAck, ChPid, LimiterPid,
17721772
rabbit_queue_type:state()) ->
17731773
{ok, rabbit_queue_type:state()} | {error, term()}.
17741774
basic_cancel(Q, ConsumerTag, OkMsg, ActingUser, QStates) ->
1775-
rabbit_queue_type:cancel(Q, ConsumerTag,
1776-
OkMsg, ActingUser, QStates).
1775+
%% TODO: is this function used anywhere?
1776+
rabbit_queue_type:cancel(Q, #{consumer_tag => ConsumerTag,
1777+
ok_msg => OkMsg,
1778+
user => ActingUser}, QStates).
17771779

17781780
-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.
17791781

deps/rabbit/src/rabbit_channel.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,8 +1454,9 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
14541454
fun () -> {error, not_found} end,
14551455
fun () ->
14561456
rabbit_queue_type:cancel(
1457-
Q, ConsumerTag, ok_msg(NoWait, OkMsg),
1458-
Username, QueueStates0)
1457+
Q, #{consumer_tag => ConsumerTag,
1458+
ok_msg => ok_msg(NoWait, OkMsg),
1459+
user => Username}, QueueStates0)
14591460
end) of
14601461
{ok, QueueStates} ->
14611462
rabbit_global_counters:consumer_deleted(amqp091),

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
close/1,
3838
update/2,
3939
consume/3,
40-
cancel/5,
40+
cancel/3,
4141
handle_event/3,
4242
deliver/3,
4343
settle/5,
@@ -276,7 +276,9 @@ consume_backwards_compat({credited, credit_api_v1}, Args) ->
276276
[{<<"x-credit">>, table, [{<<"credit">>, long, 0},
277277
{<<"drain">>, bool, false}]} | Args]}.
278278

279-
cancel(Q, ConsumerTag, OkMsg, ActingUser, State) ->
279+
cancel(Q, #{consumer_tag := ConsumerTag,
280+
user := ActingUser} = Spec, State) ->
281+
OkMsg = maps:get(ok_msg, Spec, undefined),
280282
QPid = amqqueue:get_pid(Q),
281283
case delegate:invoke(QPid, {gen_server2, call,
282284
[{basic_cancel, self(), ConsumerTag,

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -477,13 +477,15 @@ apply(#{index := Index,
477477
end
478478
end;
479479
apply(#{index := Idx} = Meta,
480-
#checkout{spec = cancel,
481-
consumer_id = ConsumerId}, State0) ->
480+
#checkout{spec = Spec,
481+
consumer_id = ConsumerId}, State0)
482+
when Spec == cancel orelse
483+
Spec == remove ->
482484
case consumer_key_from_id(ConsumerId, State0) of
483485
{ok, ConsumerKey} ->
484486
{State1, Effects1} = activate_next_consumer(
485487
cancel_consumer(Meta, ConsumerKey, State0, [],
486-
consumer_cancel)),
488+
Spec)),
487489
Reply = {ok, consumer_cancel_info(ConsumerKey, State1)},
488490
{State, _, Effects} = checkout(Meta, State0, State1, Effects1),
489491
update_smallest_raft_index(Idx, Reply, State, Effects);
@@ -1536,14 +1538,14 @@ maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey,
15361538
#consumer{cfg = CCfg} = Consumer, S0,
15371539
Effects0, Reason) ->
15381540
case Reason of
1539-
consumer_cancel ->
1541+
cancel ->
15401542
{update_or_remove_con(
15411543
Meta, ConsumerKey,
15421544
Consumer#consumer{cfg = CCfg#consumer_cfg{lifetime = once},
15431545
credit = 0,
15441546
status = cancelled},
15451547
S0), Effects0};
1546-
down ->
1548+
_ ->
15471549
{S1, Effects1} = return_all(Meta, S0, Effects0, ConsumerKey, Consumer),
15481550
{S1#?STATE{consumers = maps:remove(ConsumerKey, S1#?STATE.consumers),
15491551
last_active = Ts},
@@ -2436,9 +2438,16 @@ make_enqueue(Pid, Seq, Msg) ->
24362438
make_register_enqueuer(Pid) ->
24372439
#register_enqueuer{pid = Pid}.
24382440

2439-
-spec make_checkout(consumer_id(),
2440-
checkout_spec(), consumer_meta()) -> protocol().
2441-
make_checkout({_, _} = ConsumerId, Spec, Meta) ->
2441+
-spec make_checkout(consumer_id(), checkout_spec(), consumer_meta()) ->
2442+
protocol().
2443+
make_checkout({_, _} = ConsumerId, Spec0, Meta) ->
2444+
Spec = case is_v4() of
2445+
false when Spec0 == remove ->
2446+
%% if v4 is not active, fall back to cancel spec
2447+
make_checkout(ConsumerId, cancel, Meta);
2448+
_ ->
2449+
Spec0
2450+
end,
24422451
#checkout{consumer_id = ConsumerId,
24432452
spec = Spec, meta = Meta}.
24442453

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
credited | simple_prefetch} |
8181

8282
{dequeue, settled | unsettled} |
83-
cancel |
83+
cancel | remove |
8484
%% new v4 format
8585
{once | auto, credit_mode()}.
8686

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
init/2,
1717
checkout/4,
1818
cancel_checkout/2,
19+
cancel_checkout/3,
1920
enqueue/3,
2021
enqueue/4,
2122
dequeue/4,
@@ -459,6 +460,9 @@ credit(ConsumerTag, DeliveryCount, Credit, Drain, Echo,
459460
State = State0#state{consumers = CDels},
460461
{send_command(ServerId, undefined, Cmd, normal, State), []}.
461462

463+
cancel_checkout(ConsumerTag, State) ->
464+
cancel_checkout(ConsumerTag, cancel, State).
465+
462466
%% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag
463467
%%
464468
%% This is a synchronous call. I.e. the call will block until the command
@@ -468,18 +472,35 @@ credit(ConsumerTag, DeliveryCount, Credit, Drain, Echo,
468472
%% @param State The {@module} state.
469473
%%
470474
%% @returns `{ok, State}' or `{error | timeout, term()}'
471-
-spec cancel_checkout(rabbit_types:ctag(), state()) ->
475+
-spec cancel_checkout(rabbit_types:ctag(), Reason :: cancel | remove, state()) ->
472476
{ok, state()} | {error | timeout, term()}.
473-
cancel_checkout(ConsumerTag, #state{consumers = Consumers} = State0) ->
477+
cancel_checkout(ConsumerTag, Reason,
478+
#state{consumers = Consumers,
479+
unsent_commands = Unsent} = State0)
480+
when is_atom(Reason) ->
474481
case Consumers of
475-
#{ConsumerTag := #consumer{}} ->
482+
#{ConsumerTag := #consumer{key = Cid}} ->
476483
Servers = sorted_servers(State0),
477484
ConsumerId = {ConsumerTag, self()},
478-
%% TODO: send any pending commands for consumer
479-
%% checkout always uses the ConsumerId, rather than the key
480-
Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}),
481-
State = State0#state{consumers = maps:remove(ConsumerTag, Consumers)},
482-
case try_process_command(Servers, Cmd, State0) of
485+
%% send any pending commands for consumer before cancelling
486+
Commands = case Unsent of
487+
#{Cid := {Settled, Returns, Discards}} ->
488+
add_command(Cid, settle, Settled,
489+
add_command(Cid, return, Returns,
490+
add_command(Cid, discard,
491+
Discards, [])));
492+
_ ->
493+
[]
494+
end,
495+
ServerId = pick_server(State0),
496+
%% send all the settlements, discards and returns
497+
State1 = lists:foldl(fun (C, S0) ->
498+
send_command(ServerId, undefined, C,
499+
normal, S0)
500+
end, State0, Commands),
501+
Cmd = rabbit_fifo:make_checkout(ConsumerId, Reason, #{}),
502+
State = State1#state{consumers = maps:remove(ConsumerTag, Consumers)},
503+
case try_process_command(Servers, Cmd, State) of
483504
{ok, _, Leader} ->
484505
{ok, State#state{leader = Leader}};
485506
Err ->

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
%% stateful client API
4040
new/2,
4141
consume/3,
42-
cancel/5,
42+
cancel/3,
4343
handle_down/4,
4444
handle_event/3,
4545
module/2,
@@ -124,7 +124,11 @@
124124
exclusive_consume => boolean(),
125125
args => rabbit_framing:amqp_table(),
126126
ok_msg := term(),
127-
acting_user := rabbit_types:username()}.
127+
acting_user := rabbit_types:username()}.
128+
-type cancel_spec() :: #{consumer_tag := rabbit_types:ctag(),
129+
reason => cancel | remove,
130+
ok_msg => term(),
131+
user := rabbit_types:username()}.
128132

129133
-type delivery_options() :: #{correlation => correlation(),
130134
atom() => term()}.
@@ -134,6 +138,7 @@
134138
-export_type([state/0,
135139
consume_mode/0,
136140
consume_spec/0,
141+
cancel_spec/0,
137142
delivery_options/0,
138143
action/0,
139144
actions/0,
@@ -194,9 +199,7 @@
194199
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
195200

196201
-callback cancel(amqqueue:amqqueue(),
197-
rabbit_types:ctag(),
198-
term(),
199-
rabbit_types:username(),
202+
cancel_spec(),
200203
queue_state()) ->
201204
{ok, queue_state()} | {error, term()}.
202205

@@ -453,17 +456,15 @@ consume(Q, Spec, State) ->
453456
Err
454457
end.
455458

456-
%% TODO switch to cancel spec api
459+
%% TODO: switch to cancel spec api
457460
-spec cancel(amqqueue:amqqueue(),
458-
rabbit_types:ctag(),
459-
term(),
460-
rabbit_types:username(),
461+
cancel_spec(),
461462
state()) ->
462463
{ok, state()} | {error, term()}.
463-
cancel(Q, Tag, OkMsg, ActiveUser, Ctxs) ->
464+
cancel(Q, Spec, Ctxs) ->
464465
#ctx{state = State0} = Ctx = get_ctx(Q, Ctxs),
465466
Mod = amqqueue:get_type(Q),
466-
case Mod:cancel(Q, Tag, OkMsg, ActiveUser, State0) of
467+
case Mod:cancel(Q, Spec, State0) of
467468
{ok, State} ->
468469
{ok, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
469470
Err ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
delete/4,
2525
delete_immediately/1]).
2626
-export([state_info/1, info/2, stat/1, infos/1, infos/2]).
27-
-export([settle/5, dequeue/5, consume/3, cancel/5]).
27+
-export([settle/5, dequeue/5, consume/3, cancel/3]).
2828
-export([credit_v1/5, credit/7]).
2929
-export([purge/1]).
3030
-export([stateless_deliver/2, deliver/3]).
@@ -130,7 +130,7 @@
130130
-define(DELETE_TIMEOUT, 5000).
131131
-define(MEMBER_CHANGE_TIMEOUT, 20_000).
132132
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
133-
-define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
133+
% -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra
134134

135135
%%----------- QQ policies ---------------------------------------------------
136136

@@ -906,8 +906,8 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
906906
{ok, QState}
907907
end.
908908

909-
cancel(_Q, ConsumerTag, OkMsg, _ActingUser, State) ->
910-
maybe_send_reply(self(), OkMsg),
909+
cancel(_Q, #{consumer_tag := ConsumerTag} = Spec, State) ->
910+
maybe_send_reply(self(), maps:get(ok_msg, Spec, undefined)),
911911
rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), State).
912912

913913
emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, PrefetchCount, Args, Ref, ActingUser) ->

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
recover/2,
2020
is_recoverable/1,
2121
consume/3,
22-
cancel/5,
22+
cancel/3,
2323
handle_event/3,
2424
deliver/3,
2525
settle/5,
@@ -446,8 +446,10 @@ begin_stream(#stream_client{name = QName,
446446
reader_options = Options},
447447
{ok, State#stream_client{readers = Readers0#{Tag => Str0}}}.
448448

449-
cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
450-
name = QName} = State) ->
449+
cancel(_Q, #{consumer_tag := ConsumerTag,
450+
user := ActingUser} = Spec,
451+
#stream_client{readers = Readers0,
452+
name = QName} = State) ->
451453
case maps:take(ConsumerTag, Readers0) of
452454
{#stream{log = Log}, Readers} ->
453455
ok = close_log(Log),
@@ -457,7 +459,7 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
457459
{channel, self()},
458460
{queue, QName},
459461
{user_who_performed_action, ActingUser}]),
460-
maybe_send_reply(self(), OkMsg),
462+
maybe_send_reply(self(), maps:get(ok_msg, Spec, undefined)),
461463
{ok, State#stream_client{readers = Readers}};
462464
error ->
463465
{ok, State}

0 commit comments

Comments
 (0)