Skip to content

Commit

Permalink
Enable AMQP 1.0 clients to manage topologies
Browse files Browse the repository at this point in the history
 ## What?

* Allow AMQP 1.0 clients to dynamically create and delete RabbitMQ
  topologies (exchanges, queues, bindings).
* Provide an Erlang AMQP 1.0 client that manages topologies.

 ## Why?

Today, RabbitMQ topologies can be created via:
* [Management HTTP API](https://www.rabbitmq.com/docs/management#http-api)
  (including Management UI and
  [messaging-topology-operator](https://github.com/rabbitmq/messaging-topology-operator))
* [Definition Import](https://www.rabbitmq.com/docs/definitions#import)
* AMQP 0.9.1 clients

Up to RabbitMQ 3.13 the RabbitMQ AMQP 1.0 plugin auto creates queues
and bindings depending on the terminus [address
format](https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing).

Such implicit creation of topologies is limiting and obscure.
For some address formats, queues will be created, but not deleted.

Some of RabbitMQ's success is due to its flexible routing topologies
that AMQP 0.9.1 clients can create and delete dynamically.

This commit allows dynamic management of topologies for AMQP 1.0 clients.
This commit builds on top of Native AMQP 1.0 (PR #9022) and will be
available in RabbitMQ 4.0.

 ## How?

This commits adds the following management operations for AMQP 1.0 clients:
* declare queue
* delete queue
* purge queue
* bind queue to exchange
* unbind queue from exchange
* declare exchange
* delete exchange
* bind exchange to exchange
* unbind exchange from exchange

Hence, at least the AMQP 0.9.1 management operations are supported for
AMQP 1.0 clients.

In addition the operation
* get queue

is provided which - similar to `declare queue` - returns queue
information including the current leader and replicas.
This allows clients to publish or consume locally on the node that hosts
the queue.

Compared to AMQP 0.9.1 whose commands and command fields are fixed, the
new AMQP Management API is extensible: New operations and new fields can
easily be added in the future.

There are different design options how management operations could be
supported for AMQP 1.0 clients:
1. Use a special exchange type as done in https://github.com/rabbitmq/rabbitmq-management-exchange
  This has the advantage that any protocol client (e.g. also STOMP clients) could
  dynamically manage topologies. However, a special exchange type is the wrong abstraction.
2. Clients could send "special" messages with special headers that the broker interprets.

This commit decided for a variation of the 2nd option using a more
standardized way by re-using a subest of the following latest AMQP 1.0 extension
specifications:
* [AMQP Request-Response Messaging with Link Pairing Version 1.0 - Committee Specification 01](https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html) (February 2021)
* [HTTP Semantics and Content over AMQP Version 1.0 - Working Draft 06](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=65571) (July 2019)
* [AMQP Management Version 1.0 - Working Draft 16](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=65575) (July 2019)

An important goal is to keep the interaction between AMQP 1.0 client and RabbitMQ
simple to increase usage, development and adoptability of future RabbitMQ AMQP 1.0
client library wrappers.

The AMQP 1.0 client has to create a link pair to the special `/management` node.
This allows the client to send and receive from the management node.
Similar to AMQP 0.9.1, there is no need for a reply queue since the reply
will be sent directly to the client.

Requests and responses are modelled via HTTP, but sent via AMQP using
the `HTTP Semantics and Content over AMQP` extension (henceforth `HTTP
over AMQP` extension).

This commit tries to follow the `HTTP over AMQP` extension as much as
possible but deviates where this draft spec doesn't make sense.

The projected mode §4.1 is used as opposed to tunneled mode §4.2.
A named relay `/management` is used (§6.3) where the message field `to` is the URL.

Deviations are
* §3.1 mandates that URIs are not encoded in an AMQP message.
  However, we percent encode URIs in the AMQP message. Otherwise there
  is for example no way to distinguish a `/` in a queue name from the
  URI path separator `/`.
* §4.1.4 mandates a data section. This commit uses an amqp-value section
  as it's a better fit given that the content is AMQP encoded data.

Using an HTTP API allows for a common well understood interface and future extensibility.
Instead of re-using the current RabbitMQ HTTP API, this commit uses a
new HTTP API (let's call it v2) which could be used as a future API for
plain HTTP clients.

 ### HTTP API v1

The current HTTP API (let's call it v1) is **not** used since v1
comes with a couple of weaknesses:

1. Deep level of nesting becomes confusing and difficult to manage.

Examples of deep nesting in v1:
```
/api/bindings/vhost/e/source/e/destination/props
/api/bindings/vhost/e/exchange/q/queue/props
```

2. Redundant endpoints returning the same resources

v1 has 9 endpoints to list binding(s):
```
/api/exchanges/vhost/name/bindings/source
/api/exchanges/vhost/name/bindings/destination
/api/queues/vhost/name/bindings
/api/bindings
/api/bindings/vhost
/api/bindings/vhost/e/exchange/q/queue
/api/bindings/vhost/e/exchange/q/queue/props
/api/bindings/vhost/e/source/e/destination
/api/bindings/vhost/e/source/e/destination/props
```

3. Verbs in path names
Path names should be nouns instead.
v1 contains verbs:
```
/api/queues/vhost/name/get
/api/exchanges/vhost/name/publish
```

 ### AMQP Management extension

Only few aspects of the AMQP Management extension are used.

The central idea of the AMQP management spec is **dynamic discovery** such that broker independent AMQP 1.0
clients can discover objects, types, operations, and HTTP endpoints of specific brokers.
In fact, clients are only conformant if:
> All request addresses are dynamically discovered starting from the discovery document.
> A requesting container MUST NOT use fixed assumptions about the addressing structure of the management API.

While this is a nice and powerful idea, no AMQP 1.0 client and no AMQP 1.0 server implement the
latest AMQP 1.0 management spec from 2019, partly presumably due to its complexity.
Therefore, the idea of such dynamic discovery has failed to be implemented in practice.

The AMQP management spec mandates that the management endpoint returns a discovery document containing
broker specific collections, types, configuration, and operations including their endpoints.

The API endpoints of the AMQP management spec are therefore all designed around dynamic discovery.

For example, to create either a queue or an exchange, the client has to
```
POST /$management/entities
```
which shows that the entities collection acts as a generic factory, see section 2.2.
The server will then create the resource and reply with a location header containing a URI pointing to the resource.
For RabbitMQ, we don’t need such a generic factory to create queues or exchanges.

To list bindings for a queue Q1, the spec suggests
```
GET /$management/Queues/Q1/$management/entities
```
which again shows the generic entities endpoint as well as a `$management` endpoint under Q1 to
allow a queue to return a discovery document.
For RabbitMQ, we don’t need such generic endpoints and discovery documents.

Given we aim for our own thin RabbitMQ AMQP 1.0 client wrapper libraries which expose
the RabbitMQ model to the developer, we can directly use fixed HTTP endpoint assumptions
in our RabbitMQ specific libraries.

This is by far simpler than using the dynamic endpoints of the management spec.
Simplicity leads to higher adoption and enables more developers to write RabbitMQ AMQP 1.0 client
library wrappers.

The AMQP Management extension also suffers from deep level of nesting in paths
Examples:
```
/$management/Queues/Q1/$management/entities
/$management/Queues/Q1/Bindings/Binding1
```
as well as verbs in path names: Section 7.1.4 suggests using verbs in path names,
for example “purge”, due to the dynamic operations discovery document.

 ### HTTP API v2

This commit introduces a new HTTP API v2 following best practices.
It could serve as a future API for plain HTTP clients.

This commit and RabbitMQ 4.0 will only implement a minimal set of
HTTP API v2 endpoints and only for HTTP over AMQP.
In other words, the existing HTTP API v1 Cowboy handlers will continue to be
used for all plain HTTP requests in RabbitMQ 4.0 and will remain untouched for RabbitMQ 4.0.
Over time, after 4.0 shipped, we could ship a pure HTTP API implementation for HTTP API v2.
Hence, the new HTTP API v2 endpoints for HTTP over AMQP should be designed such that they
can be re-used in the future for a pure HTTP implementation.

The minimal set of endpoints for RabbitMQ 4.0 are:

``
GET / PUT / DELETE
/vhosts/:vhost/queues/:queue
```
read, create, delete a queue

```
DELETE
/vhosts/:vhost/queues/:queue/messages
```
purges a queue

```
GET / DELETE
/vhosts/:vhost/bindings/:binding
```
read, delete bindings
where `:binding` is a binding ID of the following path segment:
```
src=e1;dstq=q2;key=my-key;args=
```
Binding arguments `args` has an empty value by default, i.e. there are no binding arguments.
If the binding includes binding arguments, `args` will be an Erlang portable term hash
provided by the server similar to what’s provided in HTTP API v1 today.
Alternatively, we could use an arguments scheme of:
```
args=k1,utf8,v1&k2,uint,3
```
However, such a scheme leads to long URIs when there are many binding arguments.
Note that it’s perfectly fine for URI producing applications to include URI
reserved characters `=` / `;` / `,` / `$` in a path segment.

To create a binding, the client therefore needs to POST to a bindings factory URI:
```
POST
/vhosts/:vhost/bindings
```

To list all bindings between a source exchange e1 and destination exchange e2 with binding key k1:
```
GET
/vhosts/:vhost/bindings?src=e1&dste=e2&key=k1
```

This endpoint will be called by the RabbitMQ AMQP 1.0 client library to unbind a
binding with non-empty binding arguments to get the binding ID before invoking a
```
DELETE
/vhosts/:vhost/bindings/:binding
```

In future, after RabbitMQ 4.0 shipped, new API endpoints could be added.
The following is up for discussion and is only meant to show the clean and simple design of HTTP API v2.

Bindings endpoint can be queried as follows:

to list all bindings for a given source exchange e1:
```
GET
/vhosts/:vhost/bindings?src=e1
```

to list all bindings for a given destination queue q1:
```
GET
/vhosts/:vhost/bindings?dstq=q1
```

to list all bindings between a source exchange e1 and destination queue q1:
```
GET
/vhosts/:vhost/bindings?src=e1&dstq=q1
```

multiple bindings between source exchange e1 and destination queue q1 could be deleted at once as follows:
```
DELETE /vhosts/:vhost/bindings?src=e1&dstq=q1
```

GET could be supported globally across all vhosts:
```
/exchanges
/queues
/bindings
```

Publish a message:
```
POST
/vhosts/:vhost/queues/:queue/messages
```

Consume or peek a message (depending on query parameters):
```
GET
/vhosts/:vhost/queues/:queue/messages
```

Note that the AMQP 1.0 client omits the `/vhost/:vhost` path prefix.
Since an AMQP connection belongs to a single vhost, there is no need to
additionally include the vhost in every HTTP request.

Pros of HTTP API v2:

1. Low level of nesting

Queues, exchanges, bindings are top level entities directly under vhosts.
Although the HTTP API doesn’t have to reflect how resources are stored in the database,
v2 does nicely reflect the Khepri tree structure.

2. Nouns instead of verbs
HTTP API v2 is very simple to read and understand as shown by
```
POST    /vhosts/:vhost/queues/:queue/messages	to post messages, i.e. publish to a queue.
GET     /vhosts/:vhost/queues/:queue/messages	to get messages, i.e. consume or peek from a queue.
DELETE  /vhosts/:vhost/queues/:queue/messages	to delete messages, i.e. purge a queue.
```

A separate new HTTP API v2 allows us to ship only handlers for HTTP over AMQP for RabbitMQ 4.0
and therefore move faster while still keeping the option on the table to re-use the new v2 API
for pure HTTP in the future.
In contrast, re-using the HTTP API v1 for HTTP over AMQP is possible, but dirty because separate handlers
(HTTP over AMQP and pure HTTP) replying differently will be needed for the same v1 endpoints.
  • Loading branch information
ansd committed Mar 25, 2024
1 parent 524d688 commit f72be04
Show file tree
Hide file tree
Showing 43 changed files with 3,938 additions and 400 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
!/deps/amqp10_common/
!/deps/oauth2_client/
!/deps/rabbitmq_amqp1_0/
!/deps/rabbitmq_amqp_client/
!/deps/rabbitmq_auth_backend_cache/
!/deps/rabbitmq_auth_backend_http/
!/deps/rabbitmq_auth_backend_ldap/
Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ load(

APP_NAME = "amqp10_client"

APP_DESCRIPTION = "AMQP 1.0 client from the RabbitMQ Project"
APP_DESCRIPTION = "AMQP 1.0 client"

APP_MODULE = "amqp10_client_app"

Expand Down
2 changes: 1 addition & 1 deletion deps/amqp10_client/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
PROJECT = amqp10_client
PROJECT_DESCRIPTION = AMQP 1.0 client from the RabbitMQ Project
PROJECT_DESCRIPTION = AMQP 1.0 client
PROJECT_MOD = amqp10_client_app

define PROJECT_APP_EXTRA_KEYS
Expand Down
10 changes: 4 additions & 6 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
parse_uri/1
]).

-define(DEFAULT_TIMEOUT, 5000).

-type snd_settle_mode() :: amqp10_client_session:snd_settle_mode().
-type rcv_settle_mode() :: amqp10_client_session:rcv_settle_mode().

Expand Down Expand Up @@ -134,7 +132,7 @@ begin_session(Connection) when is_pid(Connection) ->
-spec begin_session_sync(pid()) ->
supervisor:startchild_ret() | session_timeout.
begin_session_sync(Connection) when is_pid(Connection) ->
begin_session_sync(Connection, ?DEFAULT_TIMEOUT).
begin_session_sync(Connection, ?TIMEOUT).

%% @doc Synchronously begins an amqp10 session using 'Connection'.
%% This is a convenience function that awaits the 'begun' event
Expand Down Expand Up @@ -191,7 +189,7 @@ attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) ->
{ok, Ref};
{amqp10_event, {link, Ref, {detached, Err}}} ->
{error, Err}
after ?DEFAULT_TIMEOUT -> link_timeout
after ?TIMEOUT -> link_timeout
end.

%% @doc Attaches a sender link to a target.
Expand Down Expand Up @@ -357,7 +355,7 @@ stop_receiver_link(#link_ref{role = receiver,
send_msg(#link_ref{role = sender, session = Session,
link_handle = Handle}, Msg0) ->
Msg = amqp10_msg:set_handle(Handle, Msg0),
amqp10_client_session:transfer(Session, Msg, ?DEFAULT_TIMEOUT).
amqp10_client_session:transfer(Session, Msg, ?TIMEOUT).

%% @doc Accept a message on a the link referred to be the 'LinkRef'.
-spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok.
Expand All @@ -376,7 +374,7 @@ settle_msg(LinkRef, Msg, Settlement) ->
%% Flows a single link credit then awaits delivery or timeout.
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.
get_msg(LinkRef) ->
get_msg(LinkRef, ?DEFAULT_TIMEOUT).
get_msg(LinkRef, ?TIMEOUT).

%% @doc Get a single message from a link.
%% Flows a single link credit then awaits delivery or timeout.
Expand Down
74 changes: 43 additions & 31 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
diff/2]).

-define(MAX_SESSION_WINDOW_SIZE, 65535).
-define(DEFAULT_TIMEOUT, 5000).
-define(UINT_OUTGOING_WINDOW, {uint, ?UINT_MAX}).
-define(INITIAL_OUTGOING_DELIVERY_ID, ?UINT_MAX).
%% "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
Expand Down Expand Up @@ -149,7 +148,7 @@
reader :: pid(),
socket :: amqp10_client_connection:amqp10_socket() | undefined,
links = #{} :: #{output_handle() => #link{}},
link_index = #{} :: #{link_name() => output_handle()},
link_index = #{} :: #{{link_role(), link_name()} => output_handle()},
link_handle_index = #{} :: #{input_handle() => output_handle()},
next_link_handle = 0 :: output_handle(),
early_attach_requests :: [term()],
Expand All @@ -172,7 +171,7 @@

-spec begin_sync(pid()) -> supervisor:startchild_ret().
begin_sync(Connection) ->
begin_sync(Connection, ?DEFAULT_TIMEOUT).
begin_sync(Connection, ?TIMEOUT).

-spec begin_sync(pid(), non_neg_integer()) ->
supervisor:startchild_ret() | session_timeout.
Expand Down Expand Up @@ -302,33 +301,37 @@ mapped(cast, #'v1_0.end'{error = Err}, State) ->
mapped(cast, #'v1_0.attach'{name = {utf8, Name},
initial_delivery_count = IDC,
handle = {uint, InHandle},
role = PeerRoleBool,
max_message_size = MaybeMaxMessageSize},
#state{links = Links, link_index = LinkIndex,
link_handle_index = LHI} = State0) ->

#{Name := OutHandle} = LinkIndex,
OurRoleBool = not PeerRoleBool,
OurRole = boolean_to_role(OurRoleBool),
LinkIndexKey = {OurRole, Name},
#{LinkIndexKey := OutHandle} = LinkIndex,
#{OutHandle := Link0} = Links,
ok = notify_link_attached(Link0),

{DeliveryCount, MaxMessageSize} =
case Link0 of
#link{role = sender,
#link{role = sender = OurRole,
delivery_count = DC} ->
MSS = case MaybeMaxMessageSize of
{ulong, S} when S > 0 -> S;
_ -> undefined
end,
{DC, MSS};
#link{role = receiver,
#link{role = receiver = OurRole,
max_message_size = MSS} ->
{unpack(IDC), MSS}
end,
Link = Link0#link{state = attached,
input_handle = InHandle,
delivery_count = DeliveryCount,
max_message_size = MaxMessageSize},
State = State0#state{links = Links#{OutHandle => Link},
link_index = maps:remove(Name, LinkIndex),
State = State0#state{links = Links#{OutHandle := Link},
link_index = maps:remove(LinkIndexKey, LinkIndex),
link_handle_index = LHI#{InHandle => OutHandle}},
{keep_state, State};
mapped(cast, #'v1_0.detach'{handle = {uint, InHandle},
Expand Down Expand Up @@ -648,8 +651,8 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->

make_source(#{role := {sender, _}}) ->
#'v1_0.source'{};
make_source(#{role := {receiver, #{address := Address} = Target, _Pid}, filter := Filter}) ->
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
TranslatedFilter = translate_filters(Filter),
#'v1_0.source'{address = {utf8, Address},
durable = {uint, Durable},
Expand Down Expand Up @@ -743,35 +746,34 @@ detach_with_error_cond(Link = #link{output_handle = OutHandle}, State, Cond) ->
ok = send(Detach, State),
Link#link{state = detach_sent}.

send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
#state{next_link_handle = OutHandle0, links = Links,
send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
#state{next_link_handle = OutHandle0, links = Links,
link_index = LinkIndex} = State) ->

Source = make_source(Args),
Target = make_target(Args),
Properties = amqp10_client_types:make_properties(Args),

{LinkTarget, RoleAsBool, InitialDeliveryCount, MaxMessageSize} =
case Role of
{LinkTarget, InitialDeliveryCount, MaxMessageSize} =
case RoleTuple of
{receiver, _, Pid} ->
{{pid, Pid}, true, undefined, max_message_size(Args)};
{{pid, Pid}, undefined, max_message_size(Args)};
{sender, #{address := TargetAddr}} ->
{TargetAddr, false, uint(?INITIAL_DELIVERY_COUNT), undefined}
end,

{OutHandle, NextLinkHandle} =
case Args of
#{handle := Handle} ->
%% Client app provided link handle.
%% Really only meant for integration tests.
{Handle, OutHandle0};
_ ->
{OutHandle0, OutHandle0 + 1}
{TargetAddr, uint(?INITIAL_DELIVERY_COUNT), undefined}
end,

{OutHandle, NextLinkHandle} = case Args of
#{handle := Handle} ->
%% Client app provided link handle.
%% Really only meant for integration tests.
{Handle, OutHandle0};
_ ->
{OutHandle0, OutHandle0 + 1}
end,
Role = element(1, RoleTuple),
% create attach performative
Attach = #'v1_0.attach'{name = {utf8, Name},
role = RoleAsBool,
role = role_to_boolean(Role),
handle = {uint, OutHandle},
source = Source,
properties = Properties,
Expand All @@ -782,12 +784,12 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
max_message_size = MaxMessageSize},
ok = Send(Attach, State),

LinkRef = make_link_ref(element(1, Role), self(), OutHandle),
Ref = make_link_ref(Role, self(), OutHandle),
Link = #link{name = Name,
ref = LinkRef,
ref = Ref,
output_handle = OutHandle,
state = attach_sent,
role = element(1, Role),
role = Role,
notify = FromPid,
auto_flow = never,
target = LinkTarget,
Expand All @@ -796,7 +798,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},

{State#state{links = Links#{OutHandle => Link},
next_link_handle = NextLinkHandle,
link_index = LinkIndex#{Name => OutHandle}}, LinkRef}.
link_index = LinkIndex#{{Role, Name} => OutHandle}}, Ref}.

-spec handle_session_flow(#'v1_0.flow'{}, #state{}) -> #state{}.
handle_session_flow(#'v1_0.flow'{next_incoming_id = MaybeNII,
Expand Down Expand Up @@ -1090,6 +1092,16 @@ sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.
reason(undefined) -> normal;
reason(Other) -> Other.

role_to_boolean(sender) ->
?AMQP_ROLE_SENDER;
role_to_boolean(receiver) ->
?AMQP_ROLE_RECEIVER.

boolean_to_role(?AMQP_ROLE_SENDER) ->
sender;
boolean_to_role(?AMQP_ROLE_RECEIVER) ->
receiver.

format_status(Status = #{data := Data0}) ->
#state{channel = Channel,
remote_channel = RemoteChannel,
Expand Down
3 changes: 1 addition & 2 deletions deps/amqp10_client/src/amqp10_client_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
link_event_detail()}.
-type amqp10_event() :: {amqp10_event, amqp10_event_detail()}.

-type properties() :: #{binary() => tuple()}.
-type properties() :: #{binary() => amqp10_binary_generator:amqp10_prim()}.

-export_type([amqp10_performative/0, channel/0,
source/0, target/0, amqp10_msg_record/0,
Expand All @@ -73,7 +73,6 @@
properties/0]).


unpack(undefined) -> undefined;
unpack({_, Value}) -> Value;
unpack(Value) -> Value.

Expand Down
15 changes: 11 additions & 4 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
%%
-module(amqp10_msg).

-include_lib("amqp10_common/include/amqp10_types.hrl").

-export([from_amqp_records/1,
to_amqp_records/1,
% "read" api
Expand Down Expand Up @@ -256,12 +258,12 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) ->
new(DeliveryTag, Body, Settled) when is_binary(Body) ->
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, 0}},
message_format = {uint, ?MESSAGE_FORMAT}},
body = [#'v1_0.data'{content = Body}]};
new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
settled = Settled,
message_format = {uint, 0}},
message_format = {uint, ?MESSAGE_FORMAT}},
body = Body}.

%% @doc Create a new settled amqp10 message using the specified delivery tag
Expand Down Expand Up @@ -322,8 +324,13 @@ set_properties(Props, #amqp10_msg{properties = undefined} = Msg) ->
set_properties(Props, Msg#amqp10_msg{properties = #'v1_0.properties'{}});
set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
% TODO many fields are `any` types and we need to try to type tag them
P = maps:fold(fun(message_id, V, Acc) when is_binary(V) ->
% message_id can be any type but we restrict it here
P = maps:fold(fun(message_id, {T, _V} = TypeVal, Acc) when T =:= ulong orelse
T =:= uuid orelse
T =:= binary orelse
T =:= uf8 ->
Acc#'v1_0.properties'{message_id = TypeVal};
(message_id, V, Acc) when is_binary(V) ->
%% backward compat clause
Acc#'v1_0.properties'{message_id = utf8(V)};
(user_id, V, Acc) when is_binary(V) ->
Acc#'v1_0.properties'{user_id = {binary, V}};
Expand Down
7 changes: 7 additions & 0 deletions deps/amqp10_common/include/amqp10_types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@
-type transfer_number() :: sequence_no().
% [2.8.10]
-type sequence_no() :: uint().

% [2.8.1]
-define(AMQP_ROLE_SENDER, false).
-define(AMQP_ROLE_RECEIVER, true).

% [3.2.16]
-define(MESSAGE_FORMAT, 0).
1 change: 1 addition & 0 deletions deps/amqp10_common/src/amqp10_binary_generator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
-export_type([
amqp10_ctor/0,
amqp10_type/0,
amqp10_prim/0,
amqp10_described/0
]).

Expand Down
12 changes: 8 additions & 4 deletions deps/amqp10_common/src/amqp10_framing.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ symbolify(FieldName) when is_atom(FieldName) ->

%% A sequence comes as an arbitrary list of values; it's not a
%% composite type.
decode({described, Descriptor, {list, Fields}}) ->
decode({described, Descriptor, {list, Fields} = Type}) ->
case amqp10_framing0:record_for(Descriptor) of
#'v1_0.amqp_sequence'{} ->
#'v1_0.amqp_sequence'{content = [decode(F) || F <- Fields]};
#'v1_0.amqp_value'{} ->
#'v1_0.amqp_value'{content = Type};
Else ->
fill_from_list(Else, Fields)
end;
decode({described, Descriptor, {map, Fields}}) ->
decode({described, Descriptor, {map, Fields} = Type}) ->
case amqp10_framing0:record_for(Descriptor) of
#'v1_0.application_properties'{} ->
#'v1_0.application_properties'{content = decode_map(Fields)};
Expand All @@ -117,13 +119,15 @@ decode({described, Descriptor, {map, Fields}}) ->
#'v1_0.message_annotations'{content = decode_map(Fields)};
#'v1_0.footer'{} ->
#'v1_0.footer'{content = decode_map(Fields)};
#'v1_0.amqp_value'{} ->
#'v1_0.amqp_value'{content = Type};
Else ->
fill_from_map(Else, Fields)
end;
decode({described, Descriptor, {binary, Field}}) ->
decode({described, Descriptor, {binary, Field} = Type}) ->
case amqp10_framing0:record_for(Descriptor) of
#'v1_0.amqp_value'{} ->
#'v1_0.amqp_value'{content = {binary, Field}};
#'v1_0.amqp_value'{content = Type};
#'v1_0.data'{} ->
#'v1_0.data'{content = Field}
end;
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,7 @@ rabbitmq_integration_suite(
],
shard_count = 3,
runtime_deps = [
"//deps/amqp10_client:erlang_app",
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

Expand All @@ -1279,7 +1279,7 @@ rabbitmq_integration_suite(
":test_event_recorder_beam",
],
runtime_deps = [
"//deps/amqp10_client:erlang_app",
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ LOCAL_DEPS = sasl os_mon inets compiler public_key crypto ssl syntax_tools xmerl

BUILD_DEPS = rabbitmq_cli
DEPS = ranch rabbit_common amqp10_common rabbitmq_prelaunch ra sysmon_handler stdout_formatter recon redbug observer_cli osiris syslog systemd seshat khepri khepri_mnesia_migration
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers meck proper amqp_client amqp10_client rabbitmq_amqp1_0
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers meck proper amqp_client rabbitmq_amqp_client rabbitmq_amqp1_0

PLT_APPS += mnesia

Expand Down
Loading

0 comments on commit f72be04

Please sign in to comment.