Skip to content

Commit 76c3153

Browse files
committed
Fix AMQP crashes for approximate numbers
This commit fixes several crashes: 1. Serialising IEEE 754-2008 decimals as well as NaN and +-Inf for float and doubles crashed 2. Converting IEEE 754-2008 decimals as well as NaN and +-Inf for float and dobules from amqp to amqpl crashed The 2nd crash looks as follows: ``` exception exit: {function_clause, [{mc_amqpl,to_091, [<<"decimal-32">>,{as_is,116,<<124,0,0,0>>}], [{file,"mc_amqpl.erl"},{line,747}]}, {mc_amqpl,'-convert_from/3-lc$^2/1-2-',1, [{file,"mc_amqpl.erl"},{line,155}]}, {mc_amqpl,convert_from,3, [{file,"mc_amqpl.erl"},{line,155}]}, {mc,convert,3,[{file,"mc.erl"},{line,358}]}, {rabbit_channel,outgoing_content,2, [{file,"rabbit_channel.erl"},{line,2649}]}, {rabbit_channel,handle_basic_get,7, [{file,"rabbit_channel.erl"},{line,2636}]}, {rabbit_channel,handle_cast,2, [{file,"rabbit_channel.erl"},{line,617}]}, {gen_server2,handle_msg,2, [{file,"gen_server2.erl"},{line,1056}]}]} ``` The 2nd crash is fixed by omitting any `{as_is, _TypeCode, _Binary}` values during AMQP 1.0 -> AMQP 0.9.1 conversion. This will be documented in the conversion table. In addition to fixing these crashes, this commit adds tests that RabbitMQ is able to store and forward IEEE 754-2008 decimals. IEEE 754-2008 decimals can be parsed and serialsed by RabbitMQ. However, RabbitMQ doesn't support interpreting this values. For example, they can't be used on the headers exchange or for AMQP filter expressions.
1 parent 6acc2b1 commit 76c3153

File tree

6 files changed

+133
-27
lines changed

6 files changed

+133
-27
lines changed

deps/amqp10_common/src/amqp10_binary_generator.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,8 @@ generate1({array, Type, List}) ->
177177
[16#e0, S + 1, Count, Array]
178178
end;
179179

180-
generate1({as_is, TypeCode, Bin}) ->
181-
<<TypeCode, Bin>>.
180+
generate1({as_is, TypeCode, Bin}) when is_binary(Bin) ->
181+
[TypeCode, Bin].
182182

183183
constructor(symbol) -> 16#b3;
184184
constructor(ubyte) -> 16#50;

deps/amqp10_common/src/amqp10_binary_parser.erl

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,17 @@ parse(<<16#e0, S:8,CountAndV:S/binary,_/binary>>, B) ->
101101
parse(<<16#f0, S:32,CountAndV:S/binary,_/binary>>, B) ->
102102
{parse_array(32, CountAndV), B+5+S};
103103
%% NaN or +-inf
104-
parse(<<16#72, V:32, _/binary>>, B) ->
105-
{{as_is, 16#72, <<V:32>>}, B+5};
106-
parse(<<16#82, V:64, _/binary>>, B) ->
107-
{{as_is, 16#82, <<V:64>>}, B+9};
104+
parse(<<16#72, V:4/binary, _/binary>>, B) ->
105+
{{as_is, 16#72, V}, B+5};
106+
parse(<<16#82, V:8/binary, _/binary>>, B) ->
107+
{{as_is, 16#82, V}, B+9};
108108
%% decimals
109-
parse(<<16#74, V:32, _/binary>>, B) ->
110-
{{as_is, 16#74, <<V:32>>}, B+5};
111-
parse(<<16#84, V:64, _/binary>>, B) ->
112-
{{as_is, 16#84, <<V:64>>}, B+9};
113-
parse(<<16#94, V:128, _/binary>>, B) ->
114-
{{as_is, 16#94, <<V:128>>}, B+17};
109+
parse(<<16#74, V:4/binary, _/binary>>, B) ->
110+
{{as_is, 16#74, V}, B+5};
111+
parse(<<16#84, V:8/binary, _/binary>>, B) ->
112+
{{as_is, 16#84, V}, B+9};
113+
parse(<<16#94, V:16/binary, _/binary>>, B) ->
114+
{{as_is, 16#94, V}, B+17};
115115
parse(<<Type, _/binary>>, B) ->
116116
throw({primitive_type_unsupported, Type, {position, B}}).
117117

@@ -317,17 +317,17 @@ pm(<<16#e0, S:8,CountAndV:S/binary,R/binary>>, O, B) ->
317317
pm(<<16#f0, S:32,CountAndV:S/binary,R/binary>>, O, B) ->
318318
[parse_array(32, CountAndV) | pm(R, O, B+5+S)];
319319
%% NaN or +-inf
320-
pm(<<16#72, V:32, R/binary>>, O, B) ->
321-
[{as_is, 16#72, <<V:32>>} | pm(R, O, B+5)];
322-
pm(<<16#82, V:64, R/binary>>, O, B) ->
323-
[{as_is, 16#82, <<V:64>>} | pm(R, O, B+9)];
320+
pm(<<16#72, V:4/binary, R/binary>>, O, B) ->
321+
[{as_is, 16#72, V} | pm(R, O, B+5)];
322+
pm(<<16#82, V:8/binary, R/binary>>, O, B) ->
323+
[{as_is, 16#82, V} | pm(R, O, B+9)];
324324
%% decimals
325-
pm(<<16#74, V:32, R/binary>>, O, B) ->
326-
[{as_is, 16#74, <<V:32>>} | pm(R, O, B+5)];
327-
pm(<<16#84, V:64, R/binary>>, O, B) ->
328-
[{as_is, 16#84, <<V:64>>} | pm(R, O, B+9)];
329-
pm(<<16#94, V:128, R/binary>>, O, B) ->
330-
[{as_is, 16#94, <<V:128>>} | pm(R, O, B+17)];
325+
pm(<<16#74, V:4/binary, R/binary>>, O, B) ->
326+
[{as_is, 16#74, V} | pm(R, O, B+5)];
327+
pm(<<16#84, V:8/binary, R/binary>>, O, B) ->
328+
[{as_is, 16#84, V} | pm(R, O, B+9)];
329+
pm(<<16#94, V:16/binary, R/binary>>, O, B) ->
330+
[{as_is, 16#94, V} | pm(R, O, B+17)];
331331
pm(<<Type, _Bin/binary>>, _O, B) ->
332332
throw({primitive_type_unsupported, Type, {position, B}}).
333333

deps/amqp10_common/test/binary_generator_SUITE.erl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,34 @@ numerals(_Config) ->
9999
roundtrip({long, 0}),
100100
roundtrip({long, 16#7FFFFFFFFFFFFFFF}),
101101
roundtrip({long, -16#8000000000000000}),
102+
102103
roundtrip({float, 0.0}),
103104
roundtrip({float, 1.0}),
104105
roundtrip({float, -1.0}),
105106
roundtrip({double, 0.0}),
106107
roundtrip({double, 1.0}),
107108
roundtrip({double, -1.0}),
109+
110+
%% float +Inf
111+
roundtrip({as_is, 16#72, <<16#7F, 16#80, 16#00, 16#00>>}),
112+
%% double +Inf
113+
roundtrip({as_is, 16#82, <<16#7F, 16#F0, 16#00, 16#00,
114+
16#00, 16#00, 16#00, 16#00>>}),
115+
116+
%% decimal32
117+
roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#00>>}), % 0
118+
roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#2A>>}), % 42
119+
roundtrip({as_is, 16#74, <<16#A2, 16#40, 16#00, 16#48>>}), % -123.45
120+
roundtrip({as_is, 16#74, <<16#78, 16#00, 16#00, 16#00>>}), % +Infinity
121+
roundtrip({as_is, 16#74, <<16#7C, 16#00, 16#00, 16#00>>}), % NaN
122+
%% decimal64
123+
roundtrip({as_is, 16#84, <<16#22, 16#34, 16#00, 16#00,
124+
16#00, 16#00, 16#00, 16#00>>}), % 0
125+
%% decimal128
126+
roundtrip({as_is, 16#94, <<16#22, 16#08, 16#00, 16#00,
127+
16#00, 16#00, 16#00, 16#00,
128+
16#00, 16#00, 16#00, 16#00,
129+
16#00, 16#00, 16#00, 16#00>>}), % 0
108130
ok.
109131

110132
utf8(_Config) ->

deps/rabbit/src/mc.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
{list, [tagged_value()]} |
9090
{map, [{tagged_value(), tagged_value()}]} |
9191
{array, atom(), [tagged_value()]} |
92+
{as_is, TypeCode :: non_neg_integer(), binary()} |
9293
null |
9394
undefined.
9495

deps/rabbit/src/mc_amqpl.erl

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,23 @@ convert_from(mc_amqp, Sections, Env) ->
152152
Type0
153153
end,
154154

155-
Headers0 = [to_091(K, V) || {{utf8, K}, V} <- AP,
156-
?IS_SHORTSTR_LEN(K)],
155+
Headers0 = lists:filtermap(fun({_Key, {as_is, _Code, _Bin}}) ->
156+
false;
157+
({{utf8, K}, V})
158+
when ?IS_SHORTSTR_LEN(K) ->
159+
{true, to_091(K, V)};
160+
(_) ->
161+
false
162+
end, AP),
157163
%% Add remaining x- message annotations as headers
158164
XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) ->
159165
{true, to_091(<<"CC">>, V)};
160166
({{symbol, <<"x-opt-rabbitmq-received-time">>}, {timestamp, Ts}}) ->
161167
{true, {<<"timestamp_in_ms">>, long, Ts}};
162168
({{symbol, <<"x-opt-deaths">>}, V}) ->
163169
convert_from_amqp_deaths(V);
170+
({_Key, {as_is, _Code, _Bin}}) ->
171+
false;
164172
({{symbol, <<"x-", _/binary>> = K}, V})
165173
when ?IS_SHORTSTR_LEN(K) ->
166174
case is_internal_header(K) of
@@ -766,12 +774,23 @@ to_091(Key, null) -> {Key, void, undefined};
766774
to_091(Key, {list, L}) ->
767775
to_091_array(Key, L);
768776
to_091(Key, {map, M}) ->
769-
{Key, table, [to_091(unwrap(K), V) || {K, V} <- M]};
777+
T = lists:filtermap(fun({K, V}) when element(1, K) =:= as_is orelse
778+
element(1, V) =:= as_is ->
779+
false;
780+
({K, V}) ->
781+
{true, to_091(unwrap(K), V)}
782+
end, M),
783+
{Key, table, T};
770784
to_091(Key, {array, _T, L}) ->
771785
to_091_array(Key, L).
772786

773787
to_091_array(Key, L) ->
774-
{Key, array, [to_091(V) || V <- L]}.
788+
A = lists:filtermap(fun({as_is, _, _}) ->
789+
false;
790+
(V) ->
791+
{true, to_091(V)}
792+
end, L),
793+
{Key, array, A}.
775794

776795
to_091({utf8, V}) -> {longstr, V};
777796
to_091({symbol, V}) -> {longstr, V};

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ groups() ->
175175
x_cc_annotation_exchange_routing_key_empty,
176176
x_cc_annotation_queue,
177177
x_cc_annotation_null,
178-
bad_x_cc_annotation_exchange
178+
bad_x_cc_annotation_exchange,
179+
decimal_types
179180
]},
180181

181182
{cluster_size_3, [shuffle],
@@ -6685,6 +6686,69 @@ bad_x_cc_annotation_exchange(Config) ->
66856686
ok = end_session_sync(Session),
66866687
ok = close_connection_sync(Connection).
66876688

6689+
%% Test that RabbitMQ can store and forward AMQP decimal types.
6690+
decimal_types(Config) ->
6691+
QName = atom_to_binary(?FUNCTION_NAME),
6692+
Address = rabbitmq_amqp_address:queue(QName),
6693+
{_, Session, LinkPair} = Init = init(Config),
6694+
{ok, _} = rabbitmq_amqp_client:declare_queue(
6695+
LinkPair, QName,
6696+
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}),
6697+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
6698+
ok = wait_for_credit(Sender),
6699+
6700+
Decimal32Zero = <<16#22, 16#50, 0, 0>>,
6701+
Decimal64Zero = <<16#22, 16#34, 0, 0, 0, 0, 0, 0>>,
6702+
Decimal128Zero = <<16#22, 16#08, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>,
6703+
Decimal3242 = <<16#22, 16#50, 16#00, 16#2A>>, % 42
6704+
Decimal32NaN = <<16#7C, 0, 0, 0>>,
6705+
Body = #'v1_0.amqp_value'{content = {list, [{as_is, 16#74, Decimal32Zero},
6706+
{as_is, 16#84, Decimal64Zero},
6707+
{as_is, 16#94, Decimal128Zero}]}},
6708+
MsgAnns = #{<<"x-decimal-32">> => {as_is, 16#74, Decimal3242},
6709+
<<"x-decimal-64">> => {as_is, 16#84, Decimal64Zero},
6710+
<<"x-decimal-128">> => {as_is, 16#94, Decimal128Zero},
6711+
<<"x-list">> => {list, [{as_is, 16#94, Decimal128Zero}]},
6712+
<<"x-map">> => {map, [{{utf8, <<"key-1">>},
6713+
{as_is, 16#94, Decimal128Zero}}]}},
6714+
AppProps = #{<<"decimal-32">> => {as_is, 16#74, Decimal32NaN}},
6715+
Msg0 = amqp10_msg:set_message_annotations(
6716+
MsgAnns,
6717+
amqp10_msg:set_application_properties(
6718+
AppProps,
6719+
amqp10_msg:new(<<"tag">>, Body))),
6720+
ok = amqp10_client:send_msg(Sender, Msg0),
6721+
ok = wait_for_accepted(<<"tag">>),
6722+
ok = amqp10_client:send_msg(Sender, Msg0),
6723+
ok = wait_for_accepted(<<"tag">>),
6724+
ok = detach_link_sync(Sender),
6725+
6726+
%% Consume the first message via AMQP 1.0
6727+
{ok, Receiver} = amqp10_client:attach_receiver_link(
6728+
Session, <<"receiver">>, Address, unsettled),
6729+
{ok, Msg} = amqp10_client:get_msg(Receiver),
6730+
?assertEqual(Body, amqp10_msg:body(Msg)),
6731+
?assertMatch(#{<<"x-decimal-32">> := {as_is, 16#74, Decimal3242},
6732+
<<"x-decimal-64">> := {as_is, 16#84, Decimal64Zero},
6733+
<<"x-decimal-128">> := {as_is, 16#94, Decimal128Zero},
6734+
<<"x-list">> := [{as_is, 16#94, Decimal128Zero}],
6735+
<<"x-map">> := [{{utf8, <<"key-1">>},
6736+
{as_is, 16#94, Decimal128Zero}}]},
6737+
amqp10_msg:message_annotations(Msg)),
6738+
?assertEqual(AppProps, amqp10_msg:application_properties(Msg)),
6739+
ok = amqp10_client:accept_msg(Receiver, Msg),
6740+
ok = detach_link_sync(Receiver),
6741+
6742+
%% Consume the second message via AMQP 0.9.1
6743+
%% We expect to receive the message without any crashes.
6744+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
6745+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{}},
6746+
amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true})),
6747+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
6748+
6749+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
6750+
ok = close(Init).
6751+
66886752
%% Attach a receiver to an unavailable quorum queue.
66896753
attach_to_down_quorum_queue(Config) ->
66906754
QName = <<"q-down">>,

0 commit comments

Comments
 (0)