1515-include_lib (" common_test/include/ct.hrl" ).
1616-include_lib (" eunit/include/eunit.hrl" ).
1717-include_lib (" amqp_client/include/amqp_client.hrl" ).
18+ -include_lib (" amqp10_common/include/amqp10_framing.hrl" ).
1819-include_lib (" rabbitmq_stomp/include/rabbit_stomp_frame.hrl" ).
1920
2021-import (util ,
@@ -170,8 +171,8 @@ amqp(Config) ->
170171 Correlation = <<" some correlation ID" >>,
171172 ContentType = <<" text/plain" >>,
172173 RequestPayload = <<" my request" >>,
173- UserProperty = [{<<" rabbit 🐇" /utf8 >>, <<" carrot 🥕" /utf8 >>},
174- {<<" x-rabbit 🐇" /utf8 >>, <<" carrot 🥕" /utf8 >>},
174+ UserProperty = [{<<" 🐇" /utf8 >>, <<" 🥕" /utf8 >>},
175+ {<<" x-🐇" /utf8 >>, <<" 🥕" /utf8 >>},
175176 {<<" key" >>, <<" val" >>},
176177 {<<" key" >>, <<" val" >>},
177178 {<<" x-key" >>, <<" val" >>},
@@ -180,21 +181,38 @@ amqp(Config) ->
180181 #{'Content-Type' => ContentType ,
181182 'Correlation-Data' => Correlation ,
182183 'Response-Topic' => MqttResponseTopic ,
183- 'User-Property' => UserProperty },
184+ 'User-Property' => UserProperty ,
185+ 'Payload-Format-Indicator' => 1 },
184186 RequestPayload , [{qos , 1 }]),
185187
186- % % As of 3.13, AMQP 1.0 is proxied via AMQP 0.9.1 and therefore the conversion from
187- % % mc_mqtt to mc_amqpl takes place. We therefore lose MQTT User Property and Response Topic
188- % % which gets converted to AMQP 0.9.1 headers. In the future, Native AMQP 1.0 will convert
189- % % from mc_mqtt to mc_amqp allowing us to do many more assertions here.
190188 {ok , Msg1 } = amqp10_client :get_msg (Receiver ),
191189 ct :pal (" Received AMQP 1.0 message:~n~p " , [Msg1 ]),
192- ? assertEqual ([RequestPayload ], amqp10_msg :body (Msg1 )),
193- ? assertMatch (#{correlation_id := Correlation ,
194- content_type := ContentType }, amqp10_msg :properties (Msg1 )),
190+
195191 ? assert (amqp10_msg :header (durable , Msg1 )),
196192 ? assert (amqp10_msg :header (first_acquirer , Msg1 )),
197193
194+ % % We expect to receive x-headers in message annotations.
195+ % % However, since annotation keys are symbols and symbols are only valid ASCII,
196+ % % we expect header
197+ % % {<<"x-🐇"/utf8>>, <<"🥕"/utf8>>}
198+ % % to be dropped.
199+ ? assertEqual (#{<<" x-key" >> => <<" val" >>,
200+ <<" x-exchange" >> => <<" amq.topic" >>,
201+ <<" x-routing-key" >> => <<" topic.1" >>},
202+ amqp10_msg :message_annotations (Msg1 )),
203+ % % In contrast, application property keys are of type string, and therefore UTF-8 encoded.
204+ ? assertEqual (#{<<" 🐇" /utf8 >> => <<" 🥕" /utf8 >>,
205+ <<" key" >> => <<" val" >>},
206+ amqp10_msg :application_properties (Msg1 )),
207+
208+ #{correlation_id := Correlation ,
209+ content_type := ContentType ,
210+ reply_to := ReplyToAddress } = amqp10_msg :properties (Msg1 ),
211+ ? assertEqual (<<" /topic/response.topic" >>, ReplyToAddress ),
212+
213+ % % Thanks to the 'Payload-Format-Indicator', we get a single utf8 value.
214+ ? assertEqual (# 'v1_0.amqp_value' {content = {utf8 , RequestPayload }}, amqp10_msg :body (Msg1 )),
215+
198216 ok = amqp10_client :settle_msg (Receiver , Msg1 , accepted ),
199217 ok = amqp10_client :detach_link (Receiver ),
200218 ok = amqp10_client :end_session (Session1 ),
@@ -205,15 +223,14 @@ amqp(Config) ->
205223 {ok , Session2 } = amqp10_client :begin_session (Connection2 ),
206224 SenderLinkName = <<" test-sender" >>,
207225 {ok , Sender } = amqp10_client :attach_sender_link (
208- % % With Native AMQP 1.0, address should be read from received reply-to
209- Session2 , SenderLinkName , <<" /topic/response.topic" >>, unsettled ),
226+ Session2 , SenderLinkName , ReplyToAddress , unsettled ),
210227 receive {amqp10_event , {link , Sender , credited }} -> ok
211228 after 1000 -> ct :fail (credited_timeout )
212229 end ,
213230
214231 DTag = <<" my-dtag" >>,
215232 ReplyPayload = <<" my response" >>,
216- Msg2a = amqp10_msg :new (DTag , ReplyPayload ),
233+ Msg2a = amqp10_msg :new (DTag , # 'v1_0.amqp_value' { content = { utf8 , ReplyPayload }} ),
217234 Msg2b = amqp10_msg :set_properties (
218235 #{correlation_id => Correlation ,
219236 content_type => ContentType },
@@ -237,7 +254,9 @@ amqp(Config) ->
237254 payload := ReplyPayload ,
238255 properties := #{'Content-Type' := ContentType ,
239256 'Correlation-Data' := Correlation ,
240- 'Subscription-Identifier' := 999 }},
257+ 'Subscription-Identifier' := 999 ,
258+ % % since the AMQP 1.0 client sent UTF-8
259+ 'Payload-Format-Indicator' := 1 }},
241260 MqttMsg )
242261 after 1000 -> ct :fail (" did not receive reply" )
243262 end ,
0 commit comments