Skip to content

Commit

Permalink
Show session and link details for AMQP 1.0 connection
Browse files Browse the repository at this point in the history
 ## What?

On the connection page in the Management UI, display detailed session and
link information including:
* Link names
* Link target and source addresses
* Link flow control state
* Session flow control state
* Number of unconfirmed and unacknowledged messages

 ## How?

A new HTTP API endpoint is added:
```
/connections/:connection_name/sessions
```

The HTTP handler first queries the Erlang connection process to find out about
all session Pids. The handler then queries each Erlang session process
of this connection.

(The table auto-refreshes by default every 5 seconds. The handler querying a single
connection with 60 idle sessions with each 250 links takes ~100 ms.)

For better user experience in the Management UI, this commit also makes the
session process store and expose link names as well as source/target addresses.
  • Loading branch information
ansd committed Nov 7, 2024
1 parent 8dd49a5 commit 9d0c851
Show file tree
Hide file tree
Showing 13 changed files with 553 additions and 37 deletions.
5 changes: 4 additions & 1 deletion deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,8 @@ silent_close_delay() ->
-spec info(rabbit_types:connection(), rabbit_types:info_keys()) ->
rabbit_types:infos().
info(Pid, InfoItems) ->
case InfoItems -- ?INFO_ITEMS of
KnownItems = [session_pids | ?INFO_ITEMS],
case InfoItems -- KnownItems of
[] ->
case gen_server:call(Pid, {info, InfoItems}, infinity) of
{ok, InfoList} ->
Expand Down Expand Up @@ -1065,6 +1066,8 @@ i(client_properties, #v1{connection = #v1_connection{properties = Props}}) ->
end;
i(channels, #v1{tracked_channels = Channels}) ->
maps:size(Channels);
i(session_pids, #v1{tracked_channels = Map}) ->
maps:values(Map);
i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) ->
Max;
i(reductions = Item, _State) ->
Expand Down
139 changes: 134 additions & 5 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@
conserve_resources/3,
check_resource_access/4,
check_read_permitted_on_topic/4,
reset_authz/2
reset_authz/2,
info/1
]).

-export([init/1,
Expand Down Expand Up @@ -148,7 +149,9 @@
}).

-record(incoming_link, {
name :: binary(),
snd_settle_mode :: snd_settle_mode(),
target_address :: null | binary(),
%% The exchange is either defined in the ATTACH frame and static for
%% the life time of the link or dynamically provided in each message's
%% "to" field (address v2).
Expand Down Expand Up @@ -197,6 +200,8 @@
}).

-record(outgoing_link, {
name :: binary(),
source_address :: binary(),
%% Although the source address of a link might be an exchange name and binding key
%% or a topic filter, an outgoing link will always consume from a queue.
queue_name :: rabbit_amqqueue:name(),
Expand Down Expand Up @@ -490,6 +495,8 @@ conserve_resources(Pid, Source, {_, Conserve, _}) ->
reset_authz(Pid, User) ->
gen_server:cast(Pid, {reset_authz, User}).

handle_call(infos, _From, State) ->
reply(infos(State), State);
handle_call(Msg, _From, State) ->
Reply = {error, {not_understood, Msg}},
reply(Reply, State).
Expand Down Expand Up @@ -1262,11 +1269,11 @@ handle_attach(#'v1_0.attach'{
reply_frames([Reply], State);

handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
name = LinkName,
name = LinkName = {utf8, LinkName0},
handle = Handle = ?UINT(HandleInt),
source = Source,
snd_settle_mode = MaybeSndSettleMode,
target = Target,
target = Target = #'v1_0.target'{address = TargetAddress},
initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt)
},
State0 = #state{incoming_links = IncomingLinks0,
Expand All @@ -1279,7 +1286,9 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
SndSettleMode = snd_settle_mode(MaybeSndSettleMode),
MaxMessageSize = persistent_term:get(max_message_size),
IncomingLink = #incoming_link{
name = LinkName0,
snd_settle_mode = SndSettleMode,
target_address = address(TargetAddress),
exchange = Exchange,
routing_key = RoutingKey,
queue_name_bin = QNameBin,
Expand Down Expand Up @@ -1316,9 +1325,10 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
end;

handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
name = LinkName,
name = LinkName = {utf8, LinkName0},
handle = Handle = ?UINT(HandleInt),
source = Source = #'v1_0.source'{filter = DesiredFilter},
source = Source = #'v1_0.source'{address = SourceAddress,
filter = DesiredFilter},
snd_settle_mode = SndSettleMode,
rcv_settle_mode = RcvSettleMode,
max_message_size = MaybeMaxMessageSize,
Expand Down Expand Up @@ -1431,6 +1441,8 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
offered_capabilities = OfferedCaps},
MaxMessageSize = max_message_size(MaybeMaxMessageSize),
Link = #outgoing_link{
name = LinkName0,
source_address = address(SourceAddress),
queue_name = queue_resource(Vhost, QNameBin),
queue_type = QType,
send_settled = SndSettled,
Expand Down Expand Up @@ -2672,6 +2684,11 @@ ensure_source_v1(Address,
Err
end.

address(undefined) ->
null;
address({utf8, String}) ->
String.

-spec ensure_target(#'v1_0.target'{},
rabbit_types:vhost(),
rabbit_types:user(),
Expand Down Expand Up @@ -3702,6 +3719,118 @@ format_status(
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

-spec info(pid()) ->
{ok, rabbit_types:infos()} | {error, term()}.
info(Pid) ->
try gen_server:call(Pid, infos) of
Infos ->
{ok, Infos}
catch _:Reason ->
{error, Reason}
end.

infos(#state{cfg = #cfg{channel_num = ChannelNum,
max_handle = MaxHandle},
next_incoming_id = NextIncomingId,
incoming_window = IncomingWindow,
next_outgoing_id = NextOutgoingId,
remote_incoming_window = RemoteIncomingWindow,
remote_outgoing_window = RemoteOutgoingWindow,
outgoing_unsettled_map = OutgoingUnsettledMap,
incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks,
incoming_management_links = IncomingManagementLinks,
outgoing_management_links = OutgoingManagementLinks
}) ->
[
{channel_number, ChannelNum},
{handle_max, MaxHandle},
{next_incoming_id, NextIncomingId},
{incoming_window, IncomingWindow},
{next_outgoing_id, NextOutgoingId},
{remote_incoming_window, RemoteIncomingWindow},
{remote_outgoing_window, RemoteOutgoingWindow},
{outgoing_unsettled_deliveries, maps:size(OutgoingUnsettledMap)},
{incoming_links,
info_incoming_management_links(IncomingManagementLinks) ++
info_incoming_links(IncomingLinks)},
{outgoing_links,
info_outgoing_management_links(OutgoingManagementLinks) ++
info_outgoing_links(OutgoingLinks)}
].

info_incoming_management_links(Links) ->
[info_incoming_link(Handle, Name, settled, ?MANAGEMENT_NODE_ADDRESS,
MaxMessageSize, DeliveryCount, Credit, 0)
|| Handle := #management_link{
name = Name,
max_message_size = MaxMessageSize,
delivery_count = DeliveryCount,
credit = Credit} <- Links].

info_incoming_links(Links) ->
[info_incoming_link(Handle, Name, SndSettleMode, TargetAddress, MaxMessageSize,
DeliveryCount, Credit, maps:size(IncomingUnconfirmedMap))
|| Handle := #incoming_link{
name = Name,
snd_settle_mode = SndSettleMode,
target_address = TargetAddress,
max_message_size = MaxMessageSize,
delivery_count = DeliveryCount,
credit = Credit,
incoming_unconfirmed_map = IncomingUnconfirmedMap} <- Links].

info_incoming_link(Handle, LinkName, SndSettleMode, TargetAddress,
MaxMessageSize, DeliveryCount, Credit, UnconfirmedMessages) ->
[{handle, Handle},
{link_name, LinkName},
{snd_settle_mode, SndSettleMode},
{target_address, TargetAddress},
{max_message_size, MaxMessageSize},
{delivery_count, DeliveryCount},
{credit, Credit},
{unconfirmed_messages, UnconfirmedMessages}].

info_outgoing_management_links(Links) ->
[info_outgoing_link(Handle, Name, ?MANAGEMENT_NODE_ADDRESS, <<>>,
true, MaxMessageSize, DeliveryCount, Credit)
|| Handle := #management_link{
name = Name,
max_message_size = MaxMessageSize,
delivery_count = DeliveryCount,
credit = Credit} <- Links].

info_outgoing_links(Links) ->
[begin
{DeliveryCount, Credit} = case ClientFlowCtl of
#client_flow_ctl{delivery_count = DC,
credit = C} ->
{DC, C};
credit_api_v1 ->
{'', ''}
end,
info_outgoing_link(Handle, Name, SourceAddress, QueueName#resource.name,
SendSettled, MaxMessageSize, DeliveryCount, Credit)

end
|| Handle := #outgoing_link{
name = Name,
source_address = SourceAddress,
queue_name = QueueName,
max_message_size = MaxMessageSize,
send_settled = SendSettled,
client_flow_ctl = ClientFlowCtl} <- Links].

info_outgoing_link(Handle, LinkName, SourceAddress, QueueNameBin, SendSettled,
MaxMessageSize, DeliveryCount, Credit) ->
[{handle, Handle},
{link_name, LinkName},
{source_address, SourceAddress},
{queue_name, QueueNameBin},
{send_settled, SendSettled},
{max_message_size, MaxMessageSize},
{delivery_count, DeliveryCount},
{credit, Credit}].

unwrap_simple_type(V = {list, _}) ->
V;
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_management/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ define PROJECT_APP_EXTRA_KEYS
endef

DEPS = rabbit_common rabbit amqp_client cowboy cowlib rabbitmq_web_dispatch rabbitmq_management_agent oauth2_client
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers proper amqp10_client
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers proper rabbitmq_amqp_client
LOCAL_DEPS += ranch ssl crypto public_key

# FIXME: Add Ranch as a BUILD_DEPS to be sure the correct version is picked.
Expand Down
4 changes: 4 additions & 0 deletions deps/rabbitmq_management/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_mgmt_wm_cluster_name.erl",
"src/rabbit_mgmt_wm_connection.erl",
"src/rabbit_mgmt_wm_connection_channels.erl",
"src/rabbit_mgmt_wm_connection_sessions.erl",
"src/rabbit_mgmt_wm_connection_user_name.erl",
"src/rabbit_mgmt_wm_connections.erl",
"src/rabbit_mgmt_wm_connections_vhost.erl",
Expand Down Expand Up @@ -182,6 +183,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_mgmt_wm_cluster_name.erl",
"src/rabbit_mgmt_wm_connection.erl",
"src/rabbit_mgmt_wm_connection_channels.erl",
"src/rabbit_mgmt_wm_connection_sessions.erl",
"src/rabbit_mgmt_wm_connection_user_name.erl",
"src/rabbit_mgmt_wm_connections.erl",
"src/rabbit_mgmt_wm_connections_vhost.erl",
Expand Down Expand Up @@ -361,6 +363,7 @@ def all_srcs(name = "all_srcs"):
"priv/www/js/tmpl/queues.ejs",
"priv/www/js/tmpl/rate-options.ejs",
"priv/www/js/tmpl/registry.ejs",
"priv/www/js/tmpl/sessions-list.ejs",
"priv/www/js/tmpl/status.ejs",
"priv/www/js/tmpl/topic-permissions.ejs",
"priv/www/js/tmpl/user.ejs",
Expand Down Expand Up @@ -407,6 +410,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_mgmt_wm_cluster_name.erl",
"src/rabbit_mgmt_wm_connection.erl",
"src/rabbit_mgmt_wm_connection_channels.erl",
"src/rabbit_mgmt_wm_connection_sessions.erl",
"src/rabbit_mgmt_wm_connection_user_name.erl",
"src/rabbit_mgmt_wm_connections.erl",
"src/rabbit_mgmt_wm_connections_vhost.erl",
Expand Down
19 changes: 15 additions & 4 deletions deps/rabbitmq_management/priv/www/js/dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,21 @@ dispatcher_add(function(sammy) {
});
sammy.get('#/connections/:name', function() {
var name = esc(this.params['name']);
render({'connection': {path: '/connections/' + name,
options: {ranges: ['data-rates-conn']}},
'channels': '/connections/' + name + '/channels'},
'connection', '#/connections');
var connectionPath = '/connections/' + name;
var reqs = {
'connection': {
path: connectionPath,
options: { ranges: ['data-rates-conn'] }
}
};
// First, get the connection details to check the protocol
var connectionDetails = JSON.parse(sync_get(connectionPath));
if (connectionDetails.protocol === 'AMQP 1-0') {
reqs['sessions'] = connectionPath + '/sessions';
} else {
reqs['channels'] = connectionPath + '/channels';
}
render(reqs, 'connection', '#/connections');
});
sammy.del('#/connections', function() {
var options = {headers: {
Expand Down
28 changes: 27 additions & 1 deletion deps/rabbitmq_management/priv/www/js/global.js
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,34 @@ var HELP = {
</dl> ',

'container-id':
'Name of the client application as sent from client to RabbitMQ in the "container-id" field of the AMQP 1.0 <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-open">open</a> frame.'
'Name of the client application as sent from client to RabbitMQ in the "container-id" field of the AMQP 1.0 <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-open">open</a> frame.',

'incoming-links':
'<a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#section-links">Links</a> where the client is the sender/publisher and RabbitMQ is the receiver of messages.',

'outgoing-links':
'<a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#section-links">Links</a> where the client is the receiver/consumer and RabbitMQ is the sender of messages.',

'target-address':
'The "address" field of the link <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-target">target</a>.',

'source-address':
'The "address" field of the link <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-source">source</a>.',

'amqp-source-queue':
'The client receives messages from this queue.',

'amqp-unconfirmed-messages':
'Number of messages that have been sent to queues but have not been confirmed by all queues.',

'snd-settle-mode':
'<a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-sender-settle-mode">Sender Settle Mode</a>',

'sender-settles':
'"true" if the sender sends all deliveries settled to the receiver. "false" if the sender sends all deliveries initially unsettled to the receiver.',

'outgoing-unsettled-deliveries':
'Number of messages that have been sent to consumers but have not yet been settled/acknowledged.'
};

///////////////////////////////////////////////////////////////////////////
Expand Down
13 changes: 13 additions & 0 deletions deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,26 @@
</div>
</div>
<% if (connection.protocol === 'AMQP 1-0') { %>
<div class="section">
<h2 class="updatable" >Sessions (<%=(sessions.length)%>)</h2>
<div class="hider updatable">
<%= format('sessions-list', {'sessions': sessions}) %>
</div>
</div>
<% } else { %>
<div class="section">
<h2 class="updatable" >Channels (<%=(channels.length)%>) </h2>
<div class="hider updatable">
<%= format('channels-list', {'channels': channels, 'mode': 'connection'}) %>
</div>
</div>
<% } %>
<% if (connection.ssl) { %>
<div class="section">
<h2>SSL</h2>
Expand Down
Loading

0 comments on commit 9d0c851

Please sign in to comment.