Skip to content

Commit

Permalink
Merge pull request #5610 from cloudamqp/tracking_refac
Browse files Browse the repository at this point in the history
connection/channel tracking refactorings
  • Loading branch information
michaelklishin authored Aug 23, 2022
2 parents 4086b39 + af25ffc commit 050258a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 34 deletions.
17 changes: 13 additions & 4 deletions deps/rabbit/src/rabbit_channel_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
ensure_tracked_tables_for_this_node/0,
delete_tracked_channel_user_entry/1]).

%% All nodes (that support the `tracking_records_in_ets' feature) must
%% export this function with the same spec, as they are called via
%% RPC from other nodes. (Their implementation can differ.)
-export([count_local_tracked_items_of_user/1]).

-export([migrate_tracking_records/0]).

-include_lib("rabbit_common/include/rabbit.hrl").
Expand Down Expand Up @@ -223,9 +228,13 @@ count_tracked_items_in(Type) ->
end.

count_tracked_items_in_ets({user, Username}) ->
rabbit_tracking:count_tracked_items_ets(
?TRACKED_CHANNEL_TABLE_PER_USER, Username,
"channels of user").
rabbit_tracking:count_on_all_nodes(
?MODULE, count_local_tracked_items_of_user, [Username],
["channels of user ", Username]).

-spec count_local_tracked_items_of_user(rabbit_types:username()) -> non_neg_integer().
count_local_tracked_items_of_user(Username) ->
rabbit_tracking:read_ets_counter(?TRACKED_CHANNEL_TABLE_PER_USER, Username).

count_tracked_items_in_mnesia({user, Username}) ->
rabbit_tracking:count_tracked_items_mnesia(
Expand Down Expand Up @@ -409,7 +418,7 @@ get_tracked_channels_by_connection_pid(ConnPid) ->
end.

get_tracked_channels_by_connection_pid_ets(ConnPid) ->
rabbit_tracking:match_tracked_items_ets(
rabbit_tracking:match_tracked_items_local(
?TRACKED_CHANNEL_TABLE,
#tracked_channel{connection = ConnPid, _ = '_'}).

Expand Down
30 changes: 21 additions & 9 deletions deps/rabbit/src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@
tracked_connection_from_connection_state/1,
lookup/1, count/0]).

%% All nodes (that support the `tracking_records_in_ets' feature) must
%% export these functions with the same spec, as they are called via
%% RPC from other nodes. (Their implementation can differ.)
-export([count_local_tracked_items_in_vhost/1,
count_local_tracked_items_of_user/1]).

-export([migrate_tracking_records/0]).

-include_lib("rabbit_common/include/rabbit.hrl").
Expand Down Expand Up @@ -226,18 +232,24 @@ count_tracked_items_in(Type) ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true -> count_tracked_items_in_ets(Type);
false -> count_tracked_items_in_mnesia(Type)
end.
end.

count_tracked_items_in_ets({vhost, VirtualHost}) ->
rabbit_tracking:count_tracked_items_ets(
?TRACKED_CONNECTION_TABLE_PER_VHOST,
VirtualHost,
"connections in vhost");
rabbit_tracking:count_on_all_nodes(
?MODULE, count_local_tracked_items_in_vhost, [VirtualHost],
["connections in vhost ", VirtualHost]);
count_tracked_items_in_ets({user, Username}) ->
rabbit_tracking:count_tracked_items_ets(
?TRACKED_CONNECTION_TABLE_PER_USER,
Username,
"connections for user").
rabbit_tracking:count_on_all_nodes(
?MODULE, count_local_tracked_items_of_user, [Username],
["connections for user ", Username]).

-spec count_local_tracked_items_in_vhost(rabbit_types:vhost()) -> non_neg_integer().
count_local_tracked_items_in_vhost(VirtualHost) ->
rabbit_tracking:read_ets_counter(?TRACKED_CONNECTION_TABLE_PER_VHOST, VirtualHost).

-spec count_local_tracked_items_of_user(rabbit_types:username()) -> non_neg_integer().
count_local_tracked_items_of_user(Username) ->
rabbit_tracking:read_ets_counter(?TRACKED_CONNECTION_TABLE_PER_USER, Username).

count_tracked_items_in_mnesia({vhost, VirtualHost}) ->
rabbit_tracking:count_tracked_items_mnesia(
Expand Down
45 changes: 24 additions & 21 deletions deps/rabbit/src/rabbit_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

-export([id/2, delete_tracked_entry/4, delete_tracked_entry_internal/4,
clear_tracking_table/1, delete_tracking_table/3]).
-export([count_tracked_items_ets/3, match_tracked_items_ets/2]).
-export([count_tracked_items_local/2, match_tracked_items_local/2]).
-export([count_on_all_nodes/4, match_tracked_items_ets/2]).
-export([read_ets_counter/2, match_tracked_items_local/2]).
-export([count_tracked_items_mnesia/4, match_tracked_items_mnesia/2]).

%%----------------------------------------------------------------------------
Expand All @@ -39,26 +39,29 @@

id(Node, Name) -> {Node, Name}.

-spec count_tracked_items_ets(atom(), term(), string()) ->
-spec count_on_all_nodes(module(), atom(), [term()], iodata()) ->
non_neg_integer().
count_tracked_items_ets(Tab, Key, ContextMsg) ->
lists:foldl(fun (Node, Acc) when Node == node() ->
N = count_tracked_items_local(Tab, Key),
Acc + N;
(Node, Acc) ->
N = case rabbit_misc:rpc_call(Node, ?MODULE, count_tracked_items_local,
[Tab, Key]) of
Int when is_integer(Int) -> Int;
{badrpc, Err} ->
rabbit_log:error(
"Failed to fetch number of ~p ~p on node ~p:~n~p",
[ContextMsg, Key, Node, Err]),
0
end,
Acc + N
end, 0, rabbit_nodes:all_running()).

count_tracked_items_local(Tab, Key) ->
count_on_all_nodes(Mod, Fun, Args, ContextMsg) ->
Nodes = rabbit_nodes:all_running(),
ResL = erpc:multicall(Nodes, Mod, Fun, Args),
sum_rpc_multicall_result(ResL, Nodes, ContextMsg, 0).

sum_rpc_multicall_result([{ok, Int}|ResL], [_N|Nodes], ContextMsg, Acc) when is_integer(Int) ->
sum_rpc_multicall_result(ResL, Nodes, ContextMsg, Acc + Int);
sum_rpc_multicall_result([{ok, BadValue}|ResL], [BadNode|Nodes], ContextMsg, Acc) ->
rabbit_log:error(
"Failed to fetch number of ~s on node ~p:~n not an integer ~p",
[ContextMsg, BadNode, BadValue]),
sum_rpc_multicall_result(ResL, Nodes, ContextMsg, Acc);
sum_rpc_multicall_result([{Class, Reason}|ResL], [BadNode|Nodes], ContextMsg, Acc) ->
rabbit_log:error(
"Failed to fetch number of ~s on node ~p:~n~p:~p",
[ContextMsg, BadNode, Class, Reason]),
sum_rpc_multicall_result(ResL, Nodes, ContextMsg, Acc);
sum_rpc_multicall_result([], [], _, Acc) ->
Acc.

read_ets_counter(Tab, Key) ->
case ets:lookup(Tab, Key) of
[] -> 0;
[{_, Val}] -> Val
Expand Down

0 comments on commit 050258a

Please sign in to comment.