@@ -50,8 +50,10 @@ all_tests() ->
5050 bind_to_unknown_queue ,
5151 binding_args_direct_exchange ,
5252 binding_args_fanout_exchange ,
53+
5354 % % Exchange bindings
54- bind_and_unbind_exchange ,
55+ bind_and_unbind_direct_exchange ,
56+ bind_and_unbind_fanout_exchange ,
5557 bind_and_delete_exchange_source ,
5658 bind_and_delete_exchange_destination ,
5759 bind_to_unknown_exchange ,
@@ -754,33 +756,61 @@ binding_args(Exchange, Config) ->
754756 ? assertMatch ({# 'basic.get_ok' {}, # amqp_msg {payload = <<" m2" >>}},
755757 amqp_channel :call (Ch , # 'basic.get' {queue = Q , no_ack = true })).
756758
757- bind_and_unbind_exchange (Config ) ->
759+ bind_and_unbind_direct_exchange (Config ) ->
760+ bind_and_unbind_exchange (<<" direct" >>, Config ).
761+
762+ bind_and_unbind_fanout_exchange (Config ) ->
763+ bind_and_unbind_exchange (<<" fanout" >>, Config ).
764+
765+ bind_and_unbind_exchange (Type , Config ) ->
758766 Server = rabbit_ct_broker_helpers :get_node_config (Config , 0 , nodename ),
759767
760768 Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
761769 X = ? config (exchange_name , Config ),
770+ Q = ? config (queue_name , Config ),
771+ RoutingKey = <<" some key" >>,
772+ SourceExchange = <<" amq." , Type /binary >>,
762773
763774 ? assertEqual ([],
764775 rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_binding , list , [<<" /" >>])),
765776
766- # 'exchange.declare_ok' {} = amqp_channel :call (Ch , # 'exchange.declare' {exchange = X }),
777+ # 'exchange.declare_ok' {} = amqp_channel :call (Ch , # 'exchange.declare' {exchange = X ,
778+ type = Type }),
767779 % % Let's bind to other exchange
768780 # 'exchange.bind_ok' {} = amqp_channel :call (Ch , # 'exchange.bind' {destination = X ,
769- source = << " amq.direct " >> ,
770- routing_key = << " key " >> }),
781+ source = SourceExchange ,
782+ routing_key = RoutingKey }),
771783
772- DirectBinding = binding_record (rabbit_misc :r (<<" /" >>, exchange , << " amq.direct " >> ),
773- rabbit_misc :r (<<" /" >>, exchange , X ),
774- << " key " >> , []),
784+ Binding = binding_record (rabbit_misc :r (<<" /" >>, exchange , SourceExchange ),
785+ rabbit_misc :r (<<" /" >>, exchange , X ),
786+ RoutingKey , []),
775787
776- ? assertEqual ([DirectBinding ],
788+ ? assertEqual ([Binding ],
777789 lists :sort (
778790 rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_binding , list , [<<" /" >>]))),
779791
792+ % % Test that a message gets routed:
793+ % % exchange -> exchange -> queue
794+ ? assertEqual ({'queue.declare_ok' , Q , 0 , 0 }, declare (Ch , Q , [])),
795+ # 'queue.bind_ok' {} = amqp_channel :call (Ch , # 'queue.bind' {exchange = X ,
796+ routing_key = RoutingKey ,
797+ queue = Q }),
798+ # 'confirm.select_ok' {} = amqp_channel :call (Ch , # 'confirm.select' {}),
799+ amqp_channel :register_confirm_handler (Ch , self ()),
800+ ok = amqp_channel :cast (Ch ,
801+ # 'basic.publish' {exchange = SourceExchange ,
802+ routing_key = RoutingKey },
803+ # amqp_msg {payload = <<" m1" >>}),
804+ receive # 'basic.ack' {} -> ok
805+ after 9000 -> ct :fail (confirm_timeout )
806+ end ,
807+ ? assertEqual (# 'queue.delete_ok' {message_count = 1 },
808+ amqp_channel :call (Ch , # 'queue.delete' {queue = Q })),
809+
780810 # 'exchange.unbind_ok' {} = amqp_channel :call (Ch ,
781811 # 'exchange.unbind' {destination = X ,
782- source = << " amq.direct " >> ,
783- routing_key = << " key " >> }),
812+ source = SourceExchange ,
813+ routing_key = RoutingKey }),
784814
785815 ? assertEqual ([],
786816 rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_binding , list , [<<" /" >>])),
0 commit comments