From 9d0c851df2bfbf15c61f8cc1e27bbcc25eba192c Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 4 Nov 2024 12:28:20 +0100 Subject: [PATCH] Show session and link details for AMQP 1.0 connection ## What? On the connection page in the Management UI, display detailed session and link information including: * Link names * Link target and source addresses * Link flow control state * Session flow control state * Number of unconfirmed and unacknowledged messages ## How? A new HTTP API endpoint is added: ``` /connections/:connection_name/sessions ``` The HTTP handler first queries the Erlang connection process to find out about all session Pids. The handler then queries each Erlang session process of this connection. (The table auto-refreshes by default every 5 seconds. The handler querying a single connection with 60 idle sessions with each 250 links takes ~100 ms.) For better user experience in the Management UI, this commit also makes the session process store and expose link names as well as source/target addresses. --- deps/rabbit/src/rabbit_amqp_reader.erl | 5 +- deps/rabbit/src/rabbit_amqp_session.erl | 139 +++++++++++++++++- deps/rabbitmq_management/Makefile | 2 +- deps/rabbitmq_management/app.bzl | 4 + .../priv/www/js/dispatcher.js | 19 ++- .../rabbitmq_management/priv/www/js/global.js | 28 +++- .../priv/www/js/tmpl/connection.ejs | 13 ++ .../priv/www/js/tmpl/sessions-list.ejs | 112 ++++++++++++++ .../src/rabbit_mgmt_dispatcher.erl | 1 + .../rabbit_mgmt_wm_connection_sessions.erl | 91 ++++++++++++ .../test/rabbit_mgmt_http_SUITE.erl | 129 +++++++++++++++- moduleindex.yaml | 1 + release-notes/4.1.0.md | 46 +++--- 13 files changed, 553 insertions(+), 37 deletions(-) create mode 100644 deps/rabbitmq_management/priv/www/js/tmpl/sessions-list.ejs create mode 100644 deps/rabbitmq_management/src/rabbit_mgmt_wm_connection_sessions.erl diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 791124d7e2d..e15473eacef 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -967,7 +967,8 @@ silent_close_delay() -> -spec info(rabbit_types:connection(), rabbit_types:info_keys()) -> rabbit_types:infos(). info(Pid, InfoItems) -> - case InfoItems -- ?INFO_ITEMS of + KnownItems = [session_pids | ?INFO_ITEMS], + case InfoItems -- KnownItems of [] -> case gen_server:call(Pid, {info, InfoItems}, infinity) of {ok, InfoList} -> @@ -1065,6 +1066,8 @@ i(client_properties, #v1{connection = #v1_connection{properties = Props}}) -> end; i(channels, #v1{tracked_channels = Channels}) -> maps:size(Channels); +i(session_pids, #v1{tracked_channels = Map}) -> + maps:values(Map); i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) -> Max; i(reductions = Item, _State) -> diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 8e965aa8c8e..5a15222c0c7 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -92,7 +92,8 @@ conserve_resources/3, check_resource_access/4, check_read_permitted_on_topic/4, - reset_authz/2 + reset_authz/2, + info/1 ]). -export([init/1, @@ -148,7 +149,9 @@ }). -record(incoming_link, { + name :: binary(), snd_settle_mode :: snd_settle_mode(), + target_address :: null | binary(), %% The exchange is either defined in the ATTACH frame and static for %% the life time of the link or dynamically provided in each message's %% "to" field (address v2). @@ -197,6 +200,8 @@ }). -record(outgoing_link, { + name :: binary(), + source_address :: binary(), %% Although the source address of a link might be an exchange name and binding key %% or a topic filter, an outgoing link will always consume from a queue. queue_name :: rabbit_amqqueue:name(), @@ -490,6 +495,8 @@ conserve_resources(Pid, Source, {_, Conserve, _}) -> reset_authz(Pid, User) -> gen_server:cast(Pid, {reset_authz, User}). +handle_call(infos, _From, State) -> + reply(infos(State), State); handle_call(Msg, _From, State) -> Reply = {error, {not_understood, Msg}}, reply(Reply, State). @@ -1262,11 +1269,11 @@ handle_attach(#'v1_0.attach'{ reply_frames([Reply], State); handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, - name = LinkName, + name = LinkName = {utf8, LinkName0}, handle = Handle = ?UINT(HandleInt), source = Source, snd_settle_mode = MaybeSndSettleMode, - target = Target, + target = Target = #'v1_0.target'{address = TargetAddress}, initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt) }, State0 = #state{incoming_links = IncomingLinks0, @@ -1279,7 +1286,9 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, SndSettleMode = snd_settle_mode(MaybeSndSettleMode), MaxMessageSize = persistent_term:get(max_message_size), IncomingLink = #incoming_link{ + name = LinkName0, snd_settle_mode = SndSettleMode, + target_address = address(TargetAddress), exchange = Exchange, routing_key = RoutingKey, queue_name_bin = QNameBin, @@ -1316,9 +1325,10 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, end; handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, - name = LinkName, + name = LinkName = {utf8, LinkName0}, handle = Handle = ?UINT(HandleInt), - source = Source = #'v1_0.source'{filter = DesiredFilter}, + source = Source = #'v1_0.source'{address = SourceAddress, + filter = DesiredFilter}, snd_settle_mode = SndSettleMode, rcv_settle_mode = RcvSettleMode, max_message_size = MaybeMaxMessageSize, @@ -1431,6 +1441,8 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, offered_capabilities = OfferedCaps}, MaxMessageSize = max_message_size(MaybeMaxMessageSize), Link = #outgoing_link{ + name = LinkName0, + source_address = address(SourceAddress), queue_name = queue_resource(Vhost, QNameBin), queue_type = QType, send_settled = SndSettled, @@ -2672,6 +2684,11 @@ ensure_source_v1(Address, Err end. +address(undefined) -> + null; +address({utf8, String}) -> + String. + -spec ensure_target(#'v1_0.target'{}, rabbit_types:vhost(), rabbit_types:user(), @@ -3702,6 +3719,118 @@ format_status( topic_permission_cache => TopicPermissionCache}, maps:update(state, State, Status). +-spec info(pid()) -> + {ok, rabbit_types:infos()} | {error, term()}. +info(Pid) -> + try gen_server:call(Pid, infos) of + Infos -> + {ok, Infos} + catch _:Reason -> + {error, Reason} + end. + +infos(#state{cfg = #cfg{channel_num = ChannelNum, + max_handle = MaxHandle}, + next_incoming_id = NextIncomingId, + incoming_window = IncomingWindow, + next_outgoing_id = NextOutgoingId, + remote_incoming_window = RemoteIncomingWindow, + remote_outgoing_window = RemoteOutgoingWindow, + outgoing_unsettled_map = OutgoingUnsettledMap, + incoming_links = IncomingLinks, + outgoing_links = OutgoingLinks, + incoming_management_links = IncomingManagementLinks, + outgoing_management_links = OutgoingManagementLinks + }) -> + [ + {channel_number, ChannelNum}, + {handle_max, MaxHandle}, + {next_incoming_id, NextIncomingId}, + {incoming_window, IncomingWindow}, + {next_outgoing_id, NextOutgoingId}, + {remote_incoming_window, RemoteIncomingWindow}, + {remote_outgoing_window, RemoteOutgoingWindow}, + {outgoing_unsettled_deliveries, maps:size(OutgoingUnsettledMap)}, + {incoming_links, + info_incoming_management_links(IncomingManagementLinks) ++ + info_incoming_links(IncomingLinks)}, + {outgoing_links, + info_outgoing_management_links(OutgoingManagementLinks) ++ + info_outgoing_links(OutgoingLinks)} + ]. + +info_incoming_management_links(Links) -> + [info_incoming_link(Handle, Name, settled, ?MANAGEMENT_NODE_ADDRESS, + MaxMessageSize, DeliveryCount, Credit, 0) + || Handle := #management_link{ + name = Name, + max_message_size = MaxMessageSize, + delivery_count = DeliveryCount, + credit = Credit} <- Links]. + +info_incoming_links(Links) -> + [info_incoming_link(Handle, Name, SndSettleMode, TargetAddress, MaxMessageSize, + DeliveryCount, Credit, maps:size(IncomingUnconfirmedMap)) + || Handle := #incoming_link{ + name = Name, + snd_settle_mode = SndSettleMode, + target_address = TargetAddress, + max_message_size = MaxMessageSize, + delivery_count = DeliveryCount, + credit = Credit, + incoming_unconfirmed_map = IncomingUnconfirmedMap} <- Links]. + +info_incoming_link(Handle, LinkName, SndSettleMode, TargetAddress, + MaxMessageSize, DeliveryCount, Credit, UnconfirmedMessages) -> + [{handle, Handle}, + {link_name, LinkName}, + {snd_settle_mode, SndSettleMode}, + {target_address, TargetAddress}, + {max_message_size, MaxMessageSize}, + {delivery_count, DeliveryCount}, + {credit, Credit}, + {unconfirmed_messages, UnconfirmedMessages}]. + +info_outgoing_management_links(Links) -> + [info_outgoing_link(Handle, Name, ?MANAGEMENT_NODE_ADDRESS, <<>>, + true, MaxMessageSize, DeliveryCount, Credit) + || Handle := #management_link{ + name = Name, + max_message_size = MaxMessageSize, + delivery_count = DeliveryCount, + credit = Credit} <- Links]. + +info_outgoing_links(Links) -> + [begin + {DeliveryCount, Credit} = case ClientFlowCtl of + #client_flow_ctl{delivery_count = DC, + credit = C} -> + {DC, C}; + credit_api_v1 -> + {'', ''} + end, + info_outgoing_link(Handle, Name, SourceAddress, QueueName#resource.name, + SendSettled, MaxMessageSize, DeliveryCount, Credit) + + end + || Handle := #outgoing_link{ + name = Name, + source_address = SourceAddress, + queue_name = QueueName, + max_message_size = MaxMessageSize, + send_settled = SendSettled, + client_flow_ctl = ClientFlowCtl} <- Links]. + +info_outgoing_link(Handle, LinkName, SourceAddress, QueueNameBin, SendSettled, + MaxMessageSize, DeliveryCount, Credit) -> + [{handle, Handle}, + {link_name, LinkName}, + {source_address, SourceAddress}, + {queue_name, QueueNameBin}, + {send_settled, SendSettled}, + {max_message_size, MaxMessageSize}, + {delivery_count, DeliveryCount}, + {credit, Credit}]. unwrap_simple_type(V = {list, _}) -> V; diff --git a/deps/rabbitmq_management/Makefile b/deps/rabbitmq_management/Makefile index 98998bfcdb4..7bfbee7a688 100644 --- a/deps/rabbitmq_management/Makefile +++ b/deps/rabbitmq_management/Makefile @@ -22,7 +22,7 @@ define PROJECT_APP_EXTRA_KEYS endef DEPS = rabbit_common rabbit amqp_client cowboy cowlib rabbitmq_web_dispatch rabbitmq_management_agent oauth2_client -TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers proper amqp10_client +TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers proper rabbitmq_amqp_client LOCAL_DEPS += ranch ssl crypto public_key # FIXME: Add Ranch as a BUILD_DEPS to be sure the correct version is picked. diff --git a/deps/rabbitmq_management/app.bzl b/deps/rabbitmq_management/app.bzl index fbee1f28610..9db0335b5f5 100644 --- a/deps/rabbitmq_management/app.bzl +++ b/deps/rabbitmq_management/app.bzl @@ -48,6 +48,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_mgmt_wm_cluster_name.erl", "src/rabbit_mgmt_wm_connection.erl", "src/rabbit_mgmt_wm_connection_channels.erl", + "src/rabbit_mgmt_wm_connection_sessions.erl", "src/rabbit_mgmt_wm_connection_user_name.erl", "src/rabbit_mgmt_wm_connections.erl", "src/rabbit_mgmt_wm_connections_vhost.erl", @@ -182,6 +183,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_mgmt_wm_cluster_name.erl", "src/rabbit_mgmt_wm_connection.erl", "src/rabbit_mgmt_wm_connection_channels.erl", + "src/rabbit_mgmt_wm_connection_sessions.erl", "src/rabbit_mgmt_wm_connection_user_name.erl", "src/rabbit_mgmt_wm_connections.erl", "src/rabbit_mgmt_wm_connections_vhost.erl", @@ -361,6 +363,7 @@ def all_srcs(name = "all_srcs"): "priv/www/js/tmpl/queues.ejs", "priv/www/js/tmpl/rate-options.ejs", "priv/www/js/tmpl/registry.ejs", + "priv/www/js/tmpl/sessions-list.ejs", "priv/www/js/tmpl/status.ejs", "priv/www/js/tmpl/topic-permissions.ejs", "priv/www/js/tmpl/user.ejs", @@ -407,6 +410,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_mgmt_wm_cluster_name.erl", "src/rabbit_mgmt_wm_connection.erl", "src/rabbit_mgmt_wm_connection_channels.erl", + "src/rabbit_mgmt_wm_connection_sessions.erl", "src/rabbit_mgmt_wm_connection_user_name.erl", "src/rabbit_mgmt_wm_connections.erl", "src/rabbit_mgmt_wm_connections_vhost.erl", diff --git a/deps/rabbitmq_management/priv/www/js/dispatcher.js b/deps/rabbitmq_management/priv/www/js/dispatcher.js index e0e520715fe..65a7872d72c 100644 --- a/deps/rabbitmq_management/priv/www/js/dispatcher.js +++ b/deps/rabbitmq_management/priv/www/js/dispatcher.js @@ -46,10 +46,21 @@ dispatcher_add(function(sammy) { }); sammy.get('#/connections/:name', function() { var name = esc(this.params['name']); - render({'connection': {path: '/connections/' + name, - options: {ranges: ['data-rates-conn']}}, - 'channels': '/connections/' + name + '/channels'}, - 'connection', '#/connections'); + var connectionPath = '/connections/' + name; + var reqs = { + 'connection': { + path: connectionPath, + options: { ranges: ['data-rates-conn'] } + } + }; + // First, get the connection details to check the protocol + var connectionDetails = JSON.parse(sync_get(connectionPath)); + if (connectionDetails.protocol === 'AMQP 1-0') { + reqs['sessions'] = connectionPath + '/sessions'; + } else { + reqs['channels'] = connectionPath + '/channels'; + } + render(reqs, 'connection', '#/connections'); }); sammy.del('#/connections', function() { var options = {headers: { diff --git a/deps/rabbitmq_management/priv/www/js/global.js b/deps/rabbitmq_management/priv/www/js/global.js index 42d7a8f34e2..a35821ebd71 100644 --- a/deps/rabbitmq_management/priv/www/js/global.js +++ b/deps/rabbitmq_management/priv/www/js/global.js @@ -586,8 +586,34 @@ var HELP = { ', 'container-id': - 'Name of the client application as sent from client to RabbitMQ in the "container-id" field of the AMQP 1.0 open frame.' + 'Name of the client application as sent from client to RabbitMQ in the "container-id" field of the AMQP 1.0 open frame.', + 'incoming-links': + 'Links where the client is the sender/publisher and RabbitMQ is the receiver of messages.', + + 'outgoing-links': + 'Links where the client is the receiver/consumer and RabbitMQ is the sender of messages.', + + 'target-address': + 'The "address" field of the link target.', + + 'source-address': + 'The "address" field of the link source.', + + 'amqp-source-queue': + 'The client receives messages from this queue.', + + 'amqp-unconfirmed-messages': + 'Number of messages that have been sent to queues but have not been confirmed by all queues.', + + 'snd-settle-mode': + 'Sender Settle Mode', + + 'sender-settles': + '"true" if the sender sends all deliveries settled to the receiver. "false" if the sender sends all deliveries initially unsettled to the receiver.', + + 'outgoing-unsettled-deliveries': + 'Number of messages that have been sent to consumers but have not yet been settled/acknowledged.' }; /////////////////////////////////////////////////////////////////////////// diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs index 07ee18ae504..ee7ba9ea021 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs @@ -84,6 +84,17 @@ +<% if (connection.protocol === 'AMQP 1-0') { %> + +
+

Sessions (<%=(sessions.length)%>)

+
+ <%= format('sessions-list', {'sessions': sessions}) %> +
+
+ +<% } else { %> +

Channels (<%=(channels.length)%>)

@@ -91,6 +102,8 @@
+<% } %> + <% if (connection.ssl) { %>

SSL

diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/sessions-list.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/sessions-list.ejs new file mode 100644 index 00000000000..61cd0afe722 --- /dev/null +++ b/deps/rabbitmq_management/priv/www/js/tmpl/sessions-list.ejs @@ -0,0 +1,112 @@ +<% if (sessions.length > 0) { %> + + + + + + + + + + + + + + + +<% + for (var i = 0; i < sessions.length; i++) { + var session = sessions[i]; +%> + + + + + + + + + + +<% if (session.incoming_links.length > 0) { %> + + + +<% } %> +<% if (session.outgoing_links.length > 0) { %> + + + +<% } %> +<% } %> + +
Channel numberhandle-maxnext-incoming-idincoming-windownext-outgoing-idremote-incoming-windowremote-outgoing-windowOutgoing unsettled deliveries
<%= fmt_string(session.channel_number) %><%= fmt_string(session.handle_max) %><%= fmt_string(session.next_incoming_id) %><%= fmt_string(session.incoming_window) %><%= fmt_string(session.next_outgoing_id) %><%= fmt_string(session.remote_incoming_window) %><%= fmt_string(session.remote_outgoing_window) %><%= fmt_string(session.outgoing_unsettled_deliveries) %>
+

Incoming Links (<%=(session.incoming_links.length)%>)

+ + + + + + + + + + + + + + +<% + for (var j = 0; j < session.incoming_links.length; j++) { + var in_link = session.incoming_links[j]; +%> + + + + + + + + + + +<% } %> + +
Link handleLink nameTarget address snd-settle-mode max-message-size (bytes)delivery-countlink-creditUnconfirmed messages
<%= fmt_string(in_link.handle) %><%= fmt_string(in_link.link_name) %><%= fmt_string(in_link.target_address) %><%= fmt_string(in_link.snd_settle_mode) %><%= fmt_string(in_link.max_message_size) %><%= fmt_string(in_link.delivery_count) %><%= fmt_string(in_link.credit) %><%= fmt_string(in_link.unconfirmed_messages) %>
+
+

Outgoing Links (<%=(session.outgoing_links.length)%>)

+ + + + + + + + + + + + + + +<% + for (var k = 0; k < session.outgoing_links.length; k++) { + var out_link = session.outgoing_links[k]; +%> + + + + + + + + + + +<% } %> + +
Link handleLink nameSource address Source queue Sender settles max-message-size (bytes)delivery-countlink-credit
<%= fmt_string(out_link.handle) %><%= fmt_string(out_link.link_name) %><%= fmt_string(out_link.source_address) %><%= fmt_string(out_link.queue_name) %><%= fmt_boolean(out_link.send_settled) %><%= fmt_string(out_link.max_message_size) %><%= fmt_string(out_link.delivery_count) %><%= fmt_string(out_link.credit) %>
+
+<% } else { %> +

No sessions

+<% } %> diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl b/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl index 726a4291cf0..2945984ecb4 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_dispatcher.erl @@ -132,6 +132,7 @@ dispatcher() -> {"/connections/:connection", rabbit_mgmt_wm_connection, []}, {"/connections/username/:username", rabbit_mgmt_wm_connection_user_name, []}, {"/connections/:connection/channels", rabbit_mgmt_wm_connection_channels, []}, + {"/connections/:connection/sessions", rabbit_mgmt_wm_connection_sessions, []}, {"/channels", rabbit_mgmt_wm_channels, []}, {"/channels/:channel", rabbit_mgmt_wm_channel, []}, {"/consumers", rabbit_mgmt_wm_consumers, []}, diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_connection_sessions.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_connection_sessions.erl new file mode 100644 index 00000000000..60768b20e13 --- /dev/null +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_connection_sessions.erl @@ -0,0 +1,91 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_mgmt_wm_connection_sessions). + +-export([init/2, to_json/2, content_types_provided/2, is_authorized/2]). +-export([resource_exists/2]). +-export([variances/2]). + +-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +%%-------------------------------------------------------------------- + +init(Req, _State) -> + {cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}. + +variances(Req, Context) -> + {[<<"accept-encoding">>, <<"origin">>], Req, Context}. + +content_types_provided(ReqData, Context) -> + {rabbit_mgmt_util:responder_map(to_json), ReqData, Context}. + +resource_exists(ReqData, Context) -> + case conn(ReqData) of + not_found -> + {false, ReqData, Context}; + _Conn -> + {true, ReqData, Context} + end. + +to_json(ReqData, Context) -> + Conn = conn(ReqData), + case proplists:get_value(protocol, Conn) of + {1, 0} -> + ConnPid = proplists:get_value(pid, Conn), + try rabbit_amqp_reader:info(ConnPid, [session_pids]) of + [{session_pids, Pids}] -> + rabbit_mgmt_util:reply_list(session_infos(Pids), + ["channel_number"], + ReqData, + Context) + catch Type:Reason0 -> + Reason = unicode:characters_to_binary( + lists:flatten( + io_lib:format( + "failed to get sessions for connection ~p: ~s ~tp", + [ConnPid, Type, Reason0]))), + rabbit_mgmt_util:internal_server_error(Reason, ReqData, Context) + end; + _ -> + rabbit_mgmt_util:bad_request(<<"connection does not use AMQP 1.0">>, + ReqData, + Context) + end. + +is_authorized(ReqData, Context) -> + rabbit_mgmt_util:is_authorized_user(ReqData, Context, conn(ReqData)). + +%%-------------------------------------------------------------------- + +conn(Req) -> + case rabbit_connection_tracking:lookup(rabbit_mgmt_util:id(connection, Req)) of + #tracked_connection{name = Name, + pid = Pid, + protocol = Protocol, + username = Username} -> + [{name, Name}, + {pid, Pid}, + {protocol, Protocol}, + {user, Username}]; + not_found -> + not_found + end. + +session_infos(Pids) -> + lists:filtermap( + fun(Pid) -> + case rabbit_amqp_session:info(Pid) of + {ok, Infos} -> + {true, Infos}; + {error, Reason} -> + rabbit_log:warning("failed to get infos for session ~p: ~tp", + [Pid, Reason]), + false + end + end, Pids). diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl index e30d532607c..eb938797549 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl @@ -200,8 +200,10 @@ all_tests() -> [ qq_status_test, list_deprecated_features_test, list_used_deprecated_features_test, - connections_test_amqpl, - connections_test_amqp, + connections_amqpl, + connections_amqp, + amqp_sessions, + amqpl_sessions, enable_plugin_amqp ]. @@ -239,7 +241,7 @@ finish_init(Group, Config) -> merge_app_env(Config1). init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(amqp10_client), + {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), Config. end_per_suite(Config) -> @@ -979,7 +981,7 @@ topic_permissions_test(Config) -> http_delete(Config, "/vhosts/myvhost2", {group, '2xx'}), passed. -connections_test_amqpl(Config) -> +connections_amqpl(Config) -> {Conn, _Ch} = open_connection_and_channel(Config), LocalPort = local_port(Conn), Path = binary_to_list( @@ -1012,7 +1014,7 @@ connections_test_amqpl(Config) -> passed. %% Test that AMQP 1.0 connection can be listed and closed via the rabbitmq_management plugin. -connections_test_amqp(Config) -> +connections_amqp(Config) -> Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), User = <<"guest">>, @@ -1069,6 +1071,123 @@ connections_test_amqp(Config) -> eventually(?_assertEqual([], http_get(Config, "/connections")), 10, 5), ?assertEqual(0, length(rpc(Config, rabbit_amqp1_0, list_local, []))). +%% Test that AMQP 1.0 sessions and links can be listed via the rabbitmq_management plugin. +amqp_sessions(Config) -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + User = <<"guest">>, + OpnConf = #{address => ?config(rmq_hostname, Config), + port => Port, + container_id => <<"my container">>, + sasl => {plain, User, <<"guest">>}}, + {ok, C} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, C, opened}} -> ok + after 5000 -> ct:fail(opened_timeout) + end, + + {ok, Session1} = amqp10_client:begin_session_sync(C), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync( + Session1, <<"my link pair">>), + QName = <<"my queue">>, + {ok, #{}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + {ok, Sender} = amqp10_client:attach_sender_link_sync( + Session1, + <<"my sender">>, + rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"my key">>)), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session1, + <<"my receiver">>, + rabbitmq_amqp_address:queue(QName)), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:flow_link_credit(Receiver, 5000, never), + + eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10), + [Connection] = http_get(Config, "/connections"), + ConnectionName = maps:get(name, Connection), + Path = "/connections/" ++ binary_to_list(uri_string:quote(ConnectionName)) ++ "/sessions", + [Session] = http_get(Config, Path), + ?assertMatch( + #{channel_number := 0, + handle_max := HandleMax, + next_incoming_id := NextIncomingId, + incoming_window := IncomingWindow, + next_outgoing_id := NextOutgoingId, + remote_incoming_window := RemoteIncomingWindow, + remote_outgoing_window := RemoteOutgoingWindow, + outgoing_unsettled_deliveries := 0, + incoming_links := [#{handle := 0, + link_name := <<"my link pair">>, + target_address := <<"/management">>, + delivery_count := DeliveryCount1, + credit := Credit1, + snd_settle_mode := <<"settled">>, + max_message_size := IncomingMaxMsgSize, + unconfirmed_messages := 0}, + #{handle := 2, + link_name := <<"my sender">>, + target_address := <<"/exchanges/amq.direct/my%20key">>, + delivery_count := DeliveryCount2, + credit := Credit2, + snd_settle_mode := <<"mixed">>, + max_message_size := IncomingMaxMsgSize, + unconfirmed_messages := 0}], + outgoing_links := [#{handle := 1, + link_name := <<"my link pair">>, + source_address := <<"/management">>, + queue_name := <<>>, + delivery_count := DeliveryCount3, + credit := 0, + max_message_size := <<"unlimited">>, + send_settled := true}, + #{handle := 3, + link_name := <<"my receiver">>, + source_address := <<"/queues/my%20queue">>, + queue_name := <<"my queue">>, + delivery_count := DeliveryCount4, + credit := 5000, + max_message_size := <<"unlimited">>, + send_settled := true}] + } when is_integer(HandleMax) andalso + is_integer(NextIncomingId) andalso + is_integer(IncomingWindow) andalso + is_integer(NextOutgoingId) andalso + is_integer(RemoteIncomingWindow) andalso + is_integer(RemoteOutgoingWindow) andalso + is_integer(Credit1) andalso + is_integer(Credit2) andalso + is_integer(IncomingMaxMsgSize) andalso + is_integer(DeliveryCount1) andalso + is_integer(DeliveryCount2) andalso + is_integer(DeliveryCount3) andalso + is_integer(DeliveryCount4), + Session), + + {ok, _Session2} = amqp10_client:begin_session_sync(C), + Sessions = http_get(Config, Path), + ?assertEqual(2, length(Sessions)), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:close_connection(C). + +%% Test that GET /connections/:name/sessions returns +%% 400 Bad Request for non-AMQP 1.0 connections. +amqpl_sessions(Config) -> + {Conn, _Ch} = open_connection_and_channel(Config), + LocalPort = local_port(Conn), + Path = binary_to_list( + rabbit_mgmt_format:print( + "/connections/127.0.0.1%3A~w%20-%3E%20127.0.0.1%3A~w/sessions", + [LocalPort, amqp_port(Config)])), + ok = await_condition( + fun() -> + http_get(Config, Path, 400), + true + end). + %% Test that AMQP 1.0 connection can be listed if the rabbitmq_management plugin gets enabled %% after the connection was established. enable_plugin_amqp(Config) -> diff --git a/moduleindex.yaml b/moduleindex.yaml index 298a9a8b141..0ace20d05b6 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -948,6 +948,7 @@ rabbitmq_management: - rabbit_mgmt_wm_cluster_name - rabbit_mgmt_wm_connection - rabbit_mgmt_wm_connection_channels +- rabbit_mgmt_wm_connection_sessions - rabbit_mgmt_wm_connection_user_name - rabbit_mgmt_wm_connections - rabbit_mgmt_wm_connections_vhost diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index d546e868f2c..32ae19d73e1 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -11,6 +11,32 @@ This feature: * adds the ability to RabbitMQ to have multiple concurrent clients each consuming only a subset of messages while maintaining message order, and * reduces network traffic between RabbitMQ and clients by only dispatching those messages that the clients are actually interested in. +### Support for Multiple Routing Keys in AMQP 1.0 via `x-cc` Message Annotation +[PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation. +This annotation allows publishers to specify a [list](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. + +### OAuth 2.0 Token Renewal on AMQP 1.0 Connections +[PR #12599](https://github.com/rabbitmq/rabbitmq-server/pull/12599) introduces support for OAuth 2.0 token renewal on AMQP 1.0 connections. +This feature allows clients to set a new token proactively before the current one [expires](/docs/oauth2#token-expiration), ensuring uninterrupted connectivity. +If a client does not set a new token before the existing one expires, RabbitMQ will automatically close the AMQP 1.0 connection. + +### Metrics for AMQP 1.0 Connections +[PR #12638](https://github.com/rabbitmq/rabbitmq-server/pull/12638) exposes the following AMQP 1.0 connection metrics in the RabbitMQ Management UI and the [/metrics/per-object](https://www.rabbitmq.com/docs/prometheus#per-object-endpoint) Prometheus endpoint: +* Bytes received and sent +* Reductions +* Garbage collections +* Number of channels/sessions + +These metrics have already been emitted for AMQP 0.9.1 connections prior to RabbitMQ 4.1. + +### AMQP 1.0 Sessions and Links in the Management UI +[PR #12670](https://github.com/rabbitmq/rabbitmq-server/pull/12670) displays detailed AMQP 1.0 session and link information on the Connection page of the Management UI including: +* Link names +* Link target and source addresses +* Link flow control state +* Session flow control state +* Number of unconfirmed and unacknowledged messages + ### Prometheus histogram for message sizes [PR #12342](https://github.com/rabbitmq/rabbitmq-server/pull/12342) exposes a Prometheus histogram for message sizes received by RabbitMQ. @@ -38,26 +64,6 @@ The introduction of required feature flags several minor versions ago showed the See the [full GitHub project](https://github.com/orgs/rabbitmq/projects/4/views/1) for the complete list of improvements and fixes. -## New Features - -### Support for Multiple Routing Keys in AMQP 1.0 via `x-cc` Message Annotation -[PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation. -This annotation allows publishers to specify a [list](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. - -### OAuth 2.0 Token Renewal on AMQP 1.0 Connections -[PR #12599](https://github.com/rabbitmq/rabbitmq-server/pull/12599) introduces support for OAuth 2.0 token renewal on AMQP 1.0 connections. -This feature allows clients to set a new token proactively before the current one [expires](/docs/oauth2#token-expiration), ensuring uninterrupted connectivity. -If a client does not set a new token before the existing one expires, RabbitMQ will automatically close the AMQP 1.0 connection. - -### Metrics for AMQP 1.0 Connections -[PR #12638](https://github.com/rabbitmq/rabbitmq-server/pull/12638) exposes the following AMQP 1.0 connection metrics in the RabbitMQ Management UI and the [/metrics/per-object](https://www.rabbitmq.com/docs/prometheus#per-object-endpoint) Prometheus endpoint: -* Bytes received and sent -* Reductions -* Garbage collections -* Number of channels/sessions - -These metrics have already been emitted for AMQP 0.9.1 connections prior to RabbitMQ 4.1. - ## Potential incompatibilities * The default MQTT [Maximum Packet Size](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901086) changed from 256 MiB to 16 MiB. This default can be overridden by [configuring](https://www.rabbitmq.com/docs/configure#config-file) `mqtt.max_packet_size_authenticated`. Note that this value must not be greater than `max_message_size` (which also defaults to 16 MiB).