|
13 | 13 | -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). |
14 | 14 |
|
15 | 15 | -compile([nowarn_export_all, export_all]). |
16 | | --compile(export_all). |
17 | 16 |
|
18 | 17 | suite() -> |
19 | 18 | [{timetrap, 5 * 60000}]. |
@@ -49,8 +48,12 @@ all_tests() -> |
49 | 48 | list_with_multiple_vhosts, |
50 | 49 | list_with_multiple_arguments, |
51 | 50 | bind_to_unknown_queue, |
| 51 | + binding_args_direct_exchange, |
| 52 | + binding_args_fanout_exchange, |
| 53 | + |
52 | 54 | %% Exchange bindings |
53 | | - bind_and_unbind_exchange, |
| 55 | + bind_and_unbind_direct_exchange, |
| 56 | + bind_and_unbind_fanout_exchange, |
54 | 57 | bind_and_delete_exchange_source, |
55 | 58 | bind_and_delete_exchange_destination, |
56 | 59 | bind_to_unknown_exchange, |
@@ -116,6 +119,7 @@ end_per_testcase(Testcase, Config) -> |
116 | 119 | %% ------------------------------------------------------------------- |
117 | 120 | %% Testcases. |
118 | 121 | %% ------------------------------------------------------------------- |
| 122 | + |
119 | 123 | bind_and_unbind(Config) -> |
120 | 124 | Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), |
121 | 125 |
|
@@ -697,33 +701,116 @@ bind_to_unknown_queue(Config) -> |
697 | 701 | rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])), |
698 | 702 | ok. |
699 | 703 |
|
700 | | -bind_and_unbind_exchange(Config) -> |
| 704 | +%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/14533 |
| 705 | +binding_args_direct_exchange(Config) -> |
| 706 | + binding_args(<<"amq.direct">>, Config). |
| 707 | + |
| 708 | +binding_args_fanout_exchange(Config) -> |
| 709 | + binding_args(<<"amq.fanout">>, Config). |
| 710 | + |
| 711 | +binding_args(Exchange, Config) -> |
| 712 | + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), |
| 713 | + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), |
| 714 | + Q = ?config(queue_name, Config), |
| 715 | + ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])), |
| 716 | + |
| 717 | + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), |
| 718 | + amqp_channel:register_confirm_handler(Ch, self()), |
| 719 | + |
| 720 | + %% Create two bindings that differ only in their binding arguments. |
| 721 | + RoutingKey = <<"some-key">>, |
| 722 | + BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}], |
| 723 | + BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}], |
| 724 | + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, |
| 725 | + routing_key = RoutingKey, |
| 726 | + queue = Q, |
| 727 | + arguments = BindingArgs1}), |
| 728 | + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange, |
| 729 | + routing_key = RoutingKey, |
| 730 | + queue = Q, |
| 731 | + arguments = BindingArgs2}), |
| 732 | + ok = amqp_channel:cast(Ch, |
| 733 | + #'basic.publish'{exchange = Exchange, |
| 734 | + routing_key = RoutingKey}, |
| 735 | + #amqp_msg{payload = <<"m1">>}), |
| 736 | + receive #'basic.ack'{} -> ok |
| 737 | + after 9000 -> ct:fail(confirm_timeout) |
| 738 | + end, |
| 739 | + |
| 740 | + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, |
| 741 | + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})), |
| 742 | + |
| 743 | + %% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding. |
| 744 | + #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange, |
| 745 | + routing_key = RoutingKey, |
| 746 | + queue = Q, |
| 747 | + arguments = BindingArgs1}), |
| 748 | + ok = amqp_channel:cast(Ch, |
| 749 | + #'basic.publish'{exchange = Exchange, |
| 750 | + routing_key = RoutingKey}, |
| 751 | + #amqp_msg{payload = <<"m2">>}), |
| 752 | + receive #'basic.ack'{} -> ok |
| 753 | + after 9000 -> ct:fail(confirm_timeout) |
| 754 | + end, |
| 755 | + |
| 756 | + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, |
| 757 | + amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})). |
| 758 | + |
| 759 | +bind_and_unbind_direct_exchange(Config) -> |
| 760 | + bind_and_unbind_exchange(<<"direct">>, Config). |
| 761 | + |
| 762 | +bind_and_unbind_fanout_exchange(Config) -> |
| 763 | + bind_and_unbind_exchange(<<"fanout">>, Config). |
| 764 | + |
| 765 | +bind_and_unbind_exchange(Type, Config) -> |
701 | 766 | Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), |
702 | 767 |
|
703 | 768 | Ch = rabbit_ct_client_helpers:open_channel(Config, Server), |
704 | 769 | X = ?config(exchange_name, Config), |
| 770 | + Q = ?config(queue_name, Config), |
| 771 | + RoutingKey = <<"some key">>, |
| 772 | + SourceExchange = <<"amq.", Type/binary>>, |
705 | 773 |
|
706 | 774 | ?assertEqual([], |
707 | 775 | rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])), |
708 | 776 |
|
709 | | - #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X}), |
| 777 | + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X, |
| 778 | + type = Type}), |
710 | 779 | %% Let's bind to other exchange |
711 | 780 | #'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = X, |
712 | | - source = <<"amq.direct">>, |
713 | | - routing_key = <<"key">>}), |
| 781 | + source = SourceExchange, |
| 782 | + routing_key = RoutingKey}), |
714 | 783 |
|
715 | | - DirectBinding = binding_record(rabbit_misc:r(<<"/">>, exchange, <<"amq.direct">>), |
716 | | - rabbit_misc:r(<<"/">>, exchange, X), |
717 | | - <<"key">>, []), |
| 784 | + Binding = binding_record(rabbit_misc:r(<<"/">>, exchange, SourceExchange), |
| 785 | + rabbit_misc:r(<<"/">>, exchange, X), |
| 786 | + RoutingKey, []), |
718 | 787 |
|
719 | | - ?assertEqual([DirectBinding], |
| 788 | + ?assertEqual([Binding], |
720 | 789 | lists:sort( |
721 | 790 | rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>]))), |
722 | 791 |
|
| 792 | + %% Test that a message gets routed: |
| 793 | + %% exchange -> exchange -> queue |
| 794 | + ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])), |
| 795 | + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = X, |
| 796 | + routing_key = RoutingKey, |
| 797 | + queue = Q}), |
| 798 | + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), |
| 799 | + amqp_channel:register_confirm_handler(Ch, self()), |
| 800 | + ok = amqp_channel:cast(Ch, |
| 801 | + #'basic.publish'{exchange = SourceExchange, |
| 802 | + routing_key = RoutingKey}, |
| 803 | + #amqp_msg{payload = <<"m1">>}), |
| 804 | + receive #'basic.ack'{} -> ok |
| 805 | + after 9000 -> ct:fail(confirm_timeout) |
| 806 | + end, |
| 807 | + ?assertEqual(#'queue.delete_ok'{message_count = 1}, |
| 808 | + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), |
| 809 | + |
723 | 810 | #'exchange.unbind_ok'{} = amqp_channel:call(Ch, |
724 | 811 | #'exchange.unbind'{destination = X, |
725 | | - source = <<"amq.direct">>, |
726 | | - routing_key = <<"key">>}), |
| 812 | + source = SourceExchange, |
| 813 | + routing_key = RoutingKey}), |
727 | 814 |
|
728 | 815 | ?assertEqual([], |
729 | 816 | rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])), |
|
0 commit comments