Skip to content

Commit

Permalink
Merge pull request #12638 from rabbitmq/amqp-connection-metrics
Browse files Browse the repository at this point in the history
Expose AMQP connection metrics
  • Loading branch information
ansd authored Nov 4, 2024
2 parents fe587ae + 3db4a97 commit 6034f3c
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 77 deletions.
108 changes: 76 additions & 32 deletions deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
-include("rabbit_amqp_reader.hrl").
-include("rabbit_amqp.hrl").

-export([init/1,
Expand Down Expand Up @@ -79,7 +80,8 @@
pending_recv :: boolean(),
buf :: list(),
buf_len :: non_neg_integer(),
tracked_channels :: #{channel_number() => Session :: pid()}
tracked_channels :: #{channel_number() => Session :: pid()},
stats_timer :: rabbit_event:state()
}).

-type state() :: #v1{}.
Expand All @@ -90,7 +92,7 @@

unpack_from_0_9_1(
{Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket,
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt},
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer},
Parent) ->
logger:update_process_metadata(#{connection => ConnectionName}),
#v1{parent = Parent,
Expand All @@ -106,6 +108,7 @@ unpack_from_0_9_1(
tracked_channels = maps:new(),
writer = none,
connection_state = received_amqp3100,
stats_timer = StatsTimer,
connection = #v1_connection{
name = ConnectionName,
container_id = none,
Expand Down Expand Up @@ -201,6 +204,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end
end.

handle_other(emit_stats, State) ->
emit_stats(State);
handle_other(ensure_stats_timer, State) ->
ensure_stats_timer(State);
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
ReasonString = rabbit_misc:format("broker forced connection closure with reason '~w'",
[Reason]),
Expand Down Expand Up @@ -247,8 +254,16 @@ handle_other({'$gen_call', From, {info, Items}}, State) ->
end,
gen_server:reply(From, Reply),
State;
handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
State;
handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) ->
case ?IS_RUNNING(State) of
true ->
Infos = infos(?CONNECTION_EVENT_KEYS, State),
rabbit_event:notify(connection_created, Infos, Ref),
rabbit_event:init_stats_timer(State, #v1.stats_timer);
false ->
%% Ignore, we will emit a connection_created event once we start running.
State
end;
handle_other(terminate_connection, _State) ->
stop;
handle_other({set_credential, Cred}, State) ->
Expand Down Expand Up @@ -527,6 +542,7 @@ handle_connection_frame(
proplists:get_value(pid, Infos),
Infos),
ok = rabbit_event:notify(connection_created, Infos),
ok = maybe_emit_stats(State),
ok = rabbit_amqp1_0:register_connection(self()),
Caps = [%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331306
<<"LINK_PAIR_V1_0">>,
Expand Down Expand Up @@ -629,25 +645,26 @@ handle_input(handshake,
switch_callback(State, {frame_header, amqp}, 8);
handle_input({frame_header, Mode},
Header = <<Size:32, DOff:8, Type:8, Channel:16>>,
State) when DOff >= 2 ->
State0) when DOff >= 2 ->
case {Mode, Type} of
{amqp, 0} -> ok;
{sasl, 1} -> ok;
_ -> throw({bad_1_0_header_type, Header, Mode})
_ -> throw({bad_1_0_header_type, Header, Mode})
end,
MaxFrameSize = State#v1.connection#v1_connection.incoming_max_frame_size,
if Size =:= 8 ->
%% heartbeat
State;
Size > MaxFrameSize ->
handle_exception(
State, Channel, error_frame(
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
"frame size (~b bytes) > maximum frame size (~b bytes)",
[Size, MaxFrameSize]));
true ->
switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8)
end;
MaxFrameSize = State0#v1.connection#v1_connection.incoming_max_frame_size,
State = if Size =:= 8 ->
%% heartbeat
State0;
Size > MaxFrameSize ->
Err = error_frame(
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
"frame size (~b bytes) > maximum frame size (~b bytes)",
[Size, MaxFrameSize]),
handle_exception(State0, Channel, Err);
true ->
switch_callback(State0, {frame_body, Mode, DOff, Channel}, Size - 8)
end,
ensure_stats_timer(State);
handle_input({frame_header, _Mode}, Malformed, _State) ->
throw({bad_1_0_header, Malformed});
handle_input({frame_body, Mode, DOff, Channel},
Expand Down Expand Up @@ -1013,13 +1030,18 @@ i(peer_host, #v1{connection = #v1_connection{peer_host = Val}}) ->
Val;
i(peer_port, #v1{connection = #v1_connection{peer_port = Val}}) ->
Val;
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
fun ([{_, I}]) -> I end, S);
i(SockStat, #v1{sock = Sock})
when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
case rabbit_net:getstat(Sock, [SockStat]) of
{ok, [{SockStat, Val}]} ->
Val;
{error, _} ->
''
end;
i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
i(SSL, #v1{sock = Sock, proxy_socket = ProxySock})
when SSL =:= ssl_protocol;
Expand All @@ -1045,15 +1067,37 @@ i(channels, #v1{tracked_channels = Channels}) ->
maps:size(Channels);
i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) ->
Max;
i(reductions = Item, _State) ->
{Item, Reductions} = erlang:process_info(self(), Item),
Reductions;
i(garbage_collection, _State) ->
rabbit_misc:get_gc_info(self());
i(Item, #v1{}) ->
throw({bad_argument, Item}).

%% From rabbit_reader
socket_info(Get, Select, #v1{sock = Sock}) ->
case Get(Sock) of
{ok, T} -> Select(T);
{error, _} -> ''
end.
maybe_emit_stats(State) ->
ok = rabbit_event:if_enabled(
State,
#v1.stats_timer,
fun() -> emit_stats(State) end).

emit_stats(State) ->
[{_, Pid},
{_, RecvOct},
{_, SendOct},
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
Infos = infos(?OTHER_METRICS, State),
rabbit_core_metrics:connection_stats(Pid, Infos),
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
%% NB: Don't call ensure_stats_timer because it becomes expensive
%% if all idle non-hibernating connections emit stats.
rabbit_event:reset_stats_timer(State, #v1.stats_timer).

ensure_stats_timer(State)
when ?IS_RUNNING(State) ->
rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
ensure_stats_timer(State) ->
State.

ignore_maintenance({map, Properties}) ->
lists:member(
Expand Down
17 changes: 17 additions & 0 deletions deps/rabbit/src/rabbit_amqp_reader.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

-define(SIMPLE_METRICS, [pid,
recv_oct,
send_oct,
reductions]).

-define(OTHER_METRICS, [recv_cnt,
send_cnt,
send_pend,
state,
channels,
garbage_collection]).
21 changes: 14 additions & 7 deletions deps/rabbit/src/rabbit_amqp_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
pending :: iolist(),
%% This field is just an optimisation to minimize the cost of erlang:iolist_size/1
pending_size :: non_neg_integer(),
monitored_sessions :: #{pid() => true}
monitored_sessions :: #{pid() => true},
stats_timer :: rabbit_event:state()
}).

-define(HIBERNATE_AFTER, 6_000).
Expand Down Expand Up @@ -100,7 +101,8 @@ init({Sock, ReaderPid}) ->
reader = ReaderPid,
pending = [],
pending_size = 0,
monitored_sessions = #{}},
monitored_sessions = #{},
stats_timer = rabbit_event:init_stats_timer()},
process_flag(message_queue_data, off_heap),
{ok, State}.

Expand All @@ -123,6 +125,10 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) ->
State = flush(State1),
{reply, ok, State}.

handle_info(emit_stats, State0 = #state{reader = ReaderPid}) ->
ReaderPid ! ensure_stats_timer,
State = rabbit_event:reset_stats_timer(State0, #state.stats_timer),
no_reply(State);
handle_info(timeout, State0) ->
State = flush(State0),
{noreply, State};
Expand Down Expand Up @@ -223,18 +229,19 @@ tcp_send(Sock, Data) ->

maybe_flush(State = #state{pending_size = PendingSize}) ->
case PendingSize > ?FLUSH_THRESHOLD of
true -> flush(State);
true -> flush(State);
false -> State
end.

flush(State = #state{pending = []}) ->
State;
flush(State = #state{sock = Sock,
pending = Pending}) ->
flush(State0 = #state{sock = Sock,
pending = Pending}) ->
case rabbit_net:send(Sock, lists:reverse(Pending)) of
ok ->
State#state{pending = [],
pending_size = 0};
State = State0#state{pending = [],
pending_size = 0},
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats);
{error, Reason} ->
exit({writer, send_failed, Reason})
end.
18 changes: 8 additions & 10 deletions deps/rabbit/src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_amqp_reader.hrl").

-export([start_link/2, info/2, force_event_refresh/2,
shutdown/2]).
Expand Down Expand Up @@ -116,10 +117,6 @@
connection_blocked_message_sent
}).

-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels,
garbage_collection]).

-define(CREATION_EVENT_KEYS,
[pid, name, port, peer_port, host,
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
Expand Down Expand Up @@ -1582,8 +1579,8 @@ i(state, #v1{connection_state = ConnectionState,
end;
i(garbage_collection, _State) ->
rabbit_misc:get_gc_info(self());
i(reductions, _State) ->
{reductions, Reductions} = erlang:process_info(self(), reductions),
i(reductions = Item, _State) ->
{Item, Reductions} = erlang:process_info(self(), Item),
Reductions;
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).

Expand Down Expand Up @@ -1623,12 +1620,12 @@ maybe_emit_stats(State) ->

emit_stats(State) ->
[{_, Pid},
{_, Recv_oct},
{_, Send_oct},
{_, RecvOct},
{_, SendOct},
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
Infos = infos(?OTHER_METRICS, State),
rabbit_core_metrics:connection_stats(Pid, Infos),
rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
ensure_stats_timer(State1).

Expand All @@ -1643,6 +1640,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
pending_recv = PendingRecv,
helper_sup = {_HelperSup091, HelperSup10},
proxy_socket = ProxySocket,
stats_timer = StatsTimer,
connection = #connection{
name = Name,
host = Host,
Expand All @@ -1651,7 +1649,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
peer_port = PeerPort,
connected_at = ConnectedAt}}) ->
{Sock, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket,
Name, Host, PeerHost, Port, PeerPort, ConnectedAt}.
Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}.

respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
log_hard_error(State, Channel, LogErr),
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit_common/src/rabbit_core_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ connection_stats(Pid, Infos) ->
ets:insert(connection_metrics, {Pid, Infos}),
ok.

connection_stats(Pid, Recv_oct, Send_oct, Reductions) ->
connection_stats(Pid, RecvOct, SendOct, Reductions) ->
%% Includes delete marker
ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}),
ets:insert(connection_coarse_metrics, {Pid, RecvOct, SendOct, Reductions, 0}),
ok.

channel_created(Pid, Infos) ->
Expand Down
29 changes: 20 additions & 9 deletions deps/rabbit_common/src/rabbit_event.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-include("rabbit.hrl").

-export([start_link/0]).
-export([init_stats_timer/2, init_disabled_stats_timer/2,
-export([init_stats_timer/0, init_stats_timer/2, init_disabled_stats_timer/2,
ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
-export([stats_level/2, if_enabled/3]).
-export([notify/2, notify/3, notify_if/3]).
Expand Down Expand Up @@ -89,23 +89,34 @@ start_link() ->
%% Nowadays, instead of sending a message to rabbit_event via notify(stats),
%% some stat-emitting objects update ETS tables directly via module rabbit_core_metrics.

init_stats_timer(C, P) ->
-spec init_stats_timer() -> state().
init_stats_timer() ->
%% If the rabbit app is not loaded - use default none:5000
StatsLevel = application:get_env(rabbit, collect_statistics, none),
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
setelement(P, C, #state{level = StatsLevel, interval = Interval,
timer = undefined}).
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
#state{level = StatsLevel,
interval = Interval,
timer = undefined}.

init_stats_timer(C, P) ->
State = init_stats_timer(),
setelement(P, C, State).

init_disabled_stats_timer(C, P) ->
setelement(P, C, #state{level = none, interval = 0, timer = undefined}).
State = #state{level = none,
interval = 0,
timer = undefined},
setelement(P, C, State).

ensure_stats_timer(C, P, Msg) ->
case element(P, C) of
#state{level = Level, interval = Interval, timer = undefined} = State
#state{level = Level,
interval = Interval,
timer = undefined} = State
when Level =/= none ->
TRef = erlang:send_after(Interval, self(), Msg),
setelement(P, C, State#state{timer = TRef});
#state{} ->
_State ->
C
end.

Expand Down Expand Up @@ -156,5 +167,5 @@ event_cons(Type, Props, Ref) ->
#event{type = Type,
props = Props,
reference = Ref,
timestamp = os:system_time(milli_seconds)}.
timestamp = os:system_time(millisecond)}.

Loading

0 comments on commit 6034f3c

Please sign in to comment.