Skip to content

Commit 4bbb0c6

Browse files
committed
Change modified outcome behaviour
With the new quorum queue v4 improvements where a requeue counter was added in addition to the quorum queue delivery counter, the following sentence from #6292 (comment) doesn't apply anymore: > Also the case where delivery_failed=false|undefined requires the release of the > message without incrementing the delivery_count. Again this is not something > that our queues are able to do so again we have to reject without requeue. Therefore, we simplify the modified outcome behaviour: RabbitMQ will from now on only discard the message if the modified's undeliverable-here field is true.
1 parent 2b1d10b commit 4bbb0c6

File tree

3 files changed

+67
-14
lines changed

3 files changed

+67
-14
lines changed

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ header(first_acquirer = K,
193193
header(delivery_count = K,
194194
#amqp10_msg{header = #'v1_0.header'{delivery_count = D}}) ->
195195
header_value(K, D);
196-
header(K, #amqp10_msg{header = undefined}) -> header_value(K, undefined).
196+
header(K, #amqp10_msg{header = undefined}) ->
197+
header_value(K, undefined).
197198

198199
-spec delivery_annotations(amqp10_msg()) -> #{annotations_key() => any()}.
199200
delivery_annotations(#amqp10_msg{delivery_annotations = undefined}) ->

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1834,20 +1834,21 @@ settle_op_from_outcome(#'v1_0.rejected'{}) ->
18341834
discard;
18351835
settle_op_from_outcome(#'v1_0.released'{}) ->
18361836
requeue;
1837-
%% Keep the same Modified behaviour as in RabbitMQ 3.x
1838-
settle_op_from_outcome(#'v1_0.modified'{delivery_failed = true,
1839-
undeliverable_here = UndelHere})
1840-
when UndelHere =/= true ->
1841-
requeue;
1842-
settle_op_from_outcome(#'v1_0.modified'{}) ->
1843-
%% If delivery_failed is not true, we can't increment its delivery_count.
1844-
%% So, we will have to reject without requeue.
1845-
%%
1846-
%% If undeliverable_here is true, this is not quite correct because
1847-
%% undeliverable_here refers to the link, and not the message in general.
1848-
%% However, we cannot filter messages from being assigned to individual consumers.
1849-
%% That's why we will have to reject it without requeue.
1837+
1838+
%% RabbitMQ does not support any of the modified outcome fields correctly.
1839+
%% However, we still allow the client to settle with the modified outcome
1840+
%% because some client libraries such as Apache QPid make use of it:
1841+
%% https://github.com/apache/qpid-jms/blob/90eb60f59cb59b7b9ad8363ee8a843d6903b8e77/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java#L464
1842+
%% In such cases, it's better when RabbitMQ does not end the session.
1843+
%% See https://github.com/rabbitmq/rabbitmq-server/issues/6121
1844+
settle_op_from_outcome(#'v1_0.modified'{undeliverable_here = true}) ->
1845+
%% This is not quite correct because undeliverable_here refers to the link,
1846+
%% and not the message in general. However, RabbitMQ cannot filter messages from
1847+
%% being assigned to individual consumers. That's why we discard.
18501848
discard;
1849+
settle_op_from_outcome(#'v1_0.modified'{}) ->
1850+
requeue;
1851+
18511852
settle_op_from_outcome(Outcome) ->
18521853
protocol_error(
18531854
?V_1_0_AMQP_ERROR_INVALID_FIELD,

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ groups() ->
4141
[
4242
reliable_send_receive_with_outcomes_classic_queue,
4343
reliable_send_receive_with_outcomes_quorum_queue,
44+
modified,
4445
sender_settle_mode_unsettled,
4546
sender_settle_mode_unsettled_fanout,
4647
sender_settle_mode_mixed,
@@ -387,6 +388,56 @@ reliable_send_receive(QType, Outcome, Config) ->
387388
ok = end_session_sync(Session2),
388389
ok = amqp10_client:close_connection(Connection2).
389390

391+
%% This test case doesn't expect the correct AMQP spec behavivour.
392+
%% We know that RabbitMQ doesn't implement the modified outcome correctly.
393+
%% Here, we test RabbitMQ's workaround behaviour:
394+
%% RabbitMQ discards if undeliverable-here is true. Otherwise, RabbitMQ requeues.
395+
modified(Config) ->
396+
QName = atom_to_binary(?FUNCTION_NAME),
397+
{Connection, Session, LinkPair} = init(Config),
398+
{ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue(
399+
LinkPair, QName,
400+
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}),
401+
Address = rabbitmq_amqp_address:queue(QName),
402+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
403+
ok = wait_for_credit(Sender),
404+
405+
Msg1 = amqp10_msg:new(<<"tag1">>, <<"m1">>, true),
406+
Msg2 = amqp10_msg:new(<<"tag2">>, <<"m2">>, true),
407+
ok = amqp10_client:send_msg(Sender, Msg1),
408+
ok = amqp10_client:send_msg(Sender, Msg2),
409+
ok = amqp10_client:detach_link(Sender),
410+
411+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
412+
413+
{ok, M1} = amqp10_client:get_msg(Receiver),
414+
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
415+
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, _UndeliverableHere = true, #{}}),
416+
417+
{ok, M2a} = amqp10_client:get_msg(Receiver),
418+
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
419+
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
420+
421+
{ok, M2b} = amqp10_client:get_msg(Receiver),
422+
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
423+
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
424+
425+
{ok, M2c} = amqp10_client:get_msg(Receiver),
426+
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
427+
ok = amqp10_client:settle_msg(Receiver, M2c, {modified, true, false, #{<<"key">> => <<"val">>}}),
428+
429+
{ok, M2d} = amqp10_client:get_msg(Receiver),
430+
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
431+
?assertEqual(0, amqp10_msg:header(delivery_count, M2d)),
432+
ok = amqp10_client:settle_msg(Receiver, M2d, modified),
433+
434+
ok = amqp10_client:detach_link(Receiver),
435+
?assertMatch({ok, #{message_count := 1}},
436+
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
437+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
438+
ok = end_session_sync(Session),
439+
ok = amqp10_client:close_connection(Connection).
440+
390441
%% Tests that confirmations are returned correctly
391442
%% when sending many messages async to a quorum queue.
392443
sender_settle_mode_unsettled(Config) ->

0 commit comments

Comments
 (0)