Skip to content

Commit 3256507

Browse files
committed
Emit cluster-wide MQTT connection infos
When listing MQTT connections with the CLI, whether feature flag delete_ra_cluster_mqtt_node is enabled or disabled, in both cases return cluster wide MQTT connections. If connection tracking is done in Ra, the CLI target node returns all connection infos because Ra is aware of all MQTT connections. If connection tracking is done in (local-only) pg, all nodes return their local MQTT connection infos.
1 parent 7905d2c commit 3256507

File tree

4 files changed

+50
-19
lines changed

4 files changed

+50
-19
lines changed

deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ scopes() -> [ctl, diagnostics].
3030
switches() -> [{verbose, boolean}].
3131
aliases() -> [{'V', verbose}].
3232

33-
description() -> <<"Lists MQTT connections on the target node">>.
33+
description() -> <<"Lists all MQTT connections">>.
3434

3535
help_section() ->
3636
{plugin, mqtt}.
@@ -68,8 +68,7 @@ run(Args, #{node := NodeName,
6868
false -> 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
6969
end,
7070

71-
%% List only connections on the target node.
72-
Nodes = [NodeName],
71+
Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),
7372

7473
'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(
7574
NodeName,

deps/rabbitmq_mqtt/src/rabbit_mqtt.erl

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313

1414
-export([start/2, stop/1]).
1515
-export([emit_connection_info_all/4,
16-
close_local_client_connections/1]).
16+
emit_connection_info_local/3,
17+
close_local_client_connections/1,
18+
%% exported for tests
19+
local_connection_pids/0]).
1720

1821
start(normal, []) ->
1922
rabbit_global_counters:init([{protocol, mqtt}]),
@@ -36,29 +39,47 @@ start(normal, []) ->
3639
stop(_) ->
3740
rabbit_mqtt_sup:stop_listeners().
3841

39-
emit_connection_info_all(_Nodes, Items, Ref, AggregatorPid) ->
42+
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
43+
case rabbit_mqtt_ff:track_client_id_in_ra() of
44+
true ->
45+
%% Ra tracks connections cluster-wide.
46+
AllPids = rabbit_mqtt_collector:list_pids(),
47+
emit_connection_info(Items, Ref, AggregatorPid, AllPids),
48+
%% Our node already emitted infos for all connections. Therefore, for the
49+
%% remaining nodes, we send back 'finished' so that the CLI does not time out.
50+
[AggregatorPid ! {Ref, finished} || _ <- lists:seq(1, length(Nodes) - 1)];
51+
false ->
52+
Pids = [spawn_link(Node, rabbit_mqtt, emit_connection_info_local,
53+
[Items, Ref, AggregatorPid])
54+
|| Node <- Nodes],
55+
rabbit_control_misc:await_emitters_termination(Pids)
56+
end.
57+
58+
emit_connection_info_local(Items, Ref, AggregatorPid) ->
59+
LocalPids = local_connection_pids(),
60+
emit_connection_info(Items, Ref, AggregatorPid, LocalPids).
61+
62+
emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
4063
rabbit_control_misc:emitting_map_with_exit_handler(
41-
AggregatorPid,
42-
Ref,
64+
AggregatorPid, Ref,
4365
fun(Pid) ->
4466
rabbit_mqtt_reader:info(Pid, Items)
45-
end,
46-
rabbit_mqtt_collector:list_pids()
47-
).
67+
end, Pids).
4868

4969
-spec close_local_client_connections(string() | binary()) -> {'ok', non_neg_integer()}.
5070
close_local_client_connections(Reason) ->
51-
LocalPids = local_connection_pids(),
52-
[rabbit_mqtt_reader:close_connection(Pid, Reason) || Pid <- LocalPids],
53-
{ok, length(LocalPids)}.
71+
Pids = local_connection_pids(),
72+
lists:foreach(fun(Pid) ->
73+
rabbit_mqtt_reader:close_connection(Pid, Reason)
74+
end, Pids),
75+
{ok, length(Pids)}.
5476

5577
-spec local_connection_pids() -> [pid()].
5678
local_connection_pids() ->
5779
case rabbit_mqtt_ff:track_client_id_in_ra() of
5880
true ->
5981
AllPids = rabbit_mqtt_collector:list_pids(),
60-
LocalPids = lists:filter(fun(Pid) -> node(Pid) =:= node() end, AllPids),
61-
LocalPids;
82+
lists:filter(fun(Pid) -> node(Pid) =:= node() end, AllPids);
6283
false ->
6384
PgScope = persistent_term:get(?PG_SCOPE),
6485
lists:flatmap(fun(Group) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,13 @@ register_clientid(Vhost, ClientId)
155155

156156
-spec remove_duplicate_clientid_connections({rabbit_types:vhost(), binary()}, pid()) -> ok.
157157
remove_duplicate_clientid_connections(PgGroup, PidToKeep) ->
158-
Pids = pg:get_local_members(persistent_term:get(?PG_SCOPE), PgGroup),
159-
lists:foreach(fun(Pid) ->
160-
gen_server:cast(Pid, duplicate_id)
161-
end, Pids -- [PidToKeep]).
158+
try persistent_term:get(?PG_SCOPE) of
159+
PgScope ->
160+
Pids = pg:get_local_members(PgScope, PgGroup),
161+
lists:foreach(fun(Pid) ->
162+
gen_server:cast(Pid, duplicate_id)
163+
end, Pids -- [PidToKeep])
164+
catch _:badarg ->
165+
%% MQTT supervision tree on this node not fully started
166+
ok
167+
end.

deps/rabbitmq_mqtt/test/command_SUITE.erl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ groups() ->
2727
]}
2828
].
2929

30+
suite() ->
31+
[
32+
{timetrap, {minutes, 3}}
33+
].
34+
3035
init_per_suite(Config) ->
3136
rabbit_ct_helpers:log_environment(),
3237
Config1 = rabbit_ct_helpers:set_config(Config, [

0 commit comments

Comments
 (0)