Skip to content

Commit 85e38dc

Browse files
Merge pull request #5895 from rabbitmq/mqtt
Native MQTT
2 parents 7391d85 + 50e2577 commit 85e38dc

File tree

113 files changed

+7352
-4555
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

113 files changed

+7352
-4555
lines changed

deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_conf.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,13 @@ set_default_config() ->
122122
{schedule_ms_limit, 0},
123123
{heap_word_limit, 0},
124124
{busy_port, false},
125-
{busy_dist_port, true}]}
126-
| OsirisConfig
125+
{busy_dist_port, true}]},
126+
{mnesia,
127+
[
128+
{dump_log_write_threshold, 5000},
129+
{dump_log_time_threshold, 90000}
130+
]}
131+
| OsirisConfig
127132
],
128133
apply_erlang_term_based_config(Config).
129134

deps/rabbit/docs/rabbitmq.conf.example

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@
755755
# stomp.tcp_listen_options.nodelay = true
756756
#
757757
# stomp.tcp_listen_options.exit_on_close = true
758-
# stomp.tcp_listen_options.send_timeout = 120
758+
# stomp.tcp_listen_options.send_timeout = 120000
759759

760760
## Proxy protocol support
761761
##
@@ -838,7 +838,7 @@
838838
# mqtt.tcp_listen_options.nodelay = true
839839
#
840840
# mqtt.tcp_listen_options.exit_on_close = true
841-
# mqtt.tcp_listen_options.send_timeout = 120
841+
# mqtt.tcp_listen_options.send_timeout = 120000
842842

843843
## TLS listener settings
844844
## ## See https://rabbitmq.com/mqtt.html and https://rabbitmq.com/ssl.html for details.

deps/rabbit/src/rabbit.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -727,7 +727,8 @@ status() ->
727727
true ->
728728
[{virtual_host_count, rabbit_vhost:count()},
729729
{connection_count,
730-
length(rabbit_networking:connections_local())},
730+
length(rabbit_networking:connections_local()) +
731+
length(rabbit_networking:local_non_amqp_connections())},
731732
{queue_count, total_queue_count()}];
732733
false ->
733734
[]
@@ -1163,12 +1164,12 @@ config_locations() ->
11631164
% This event is necessary for the stats timer to be initialized with
11641165
% the correct values once the management agent has started
11651166
force_event_refresh(Ref) ->
1166-
% direct connections, e.g. MQTT, STOMP
1167+
% direct connections, e.g. STOMP
11671168
ok = rabbit_direct:force_event_refresh(Ref),
11681169
% AMQP connections
11691170
ok = rabbit_networking:force_connection_event_refresh(Ref),
1170-
% "external" connections, which are not handled by the "AMQP core",
1171-
% e.g. connections to the stream plugin
1171+
% non-AMQP connections, which are not handled by the "AMQP core",
1172+
% e.g. connections to the stream and MQTT plugins
11721173
ok = rabbit_networking:force_non_amqp_connection_event_refresh(Ref),
11731174
ok = rabbit_channel:force_event_refresh(Ref),
11741175
ok = rabbit_amqqueue:force_event_refresh(Ref).

deps/rabbit/src/rabbit_access_control.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
-export_type([permission_atom/0]).
2020

21-
-type permission_atom() :: 'configure' | 'read' | 'write'.
21+
-type permission_atom() :: 'configure' | 'write' | 'read'.
2222

2323
%%----------------------------------------------------------------------------
2424

@@ -194,8 +194,8 @@ check_resource_access(User = #user{username = Username,
194194
check_access(
195195
fun() -> Module:check_resource_access(
196196
auth_user(User, Impl), Resource, Permission, Context) end,
197-
Module, "access to ~ts refused for user '~ts'",
198-
[rabbit_misc:rs(Resource), Username]);
197+
Module, "~s access to ~s refused for user '~s'",
198+
[Permission, rabbit_misc:rs(Resource), Username]);
199199
(_, Else) -> Else
200200
end, ok, Modules).
201201

@@ -207,8 +207,8 @@ check_topic_access(User = #user{username = Username,
207207
check_access(
208208
fun() -> Module:check_topic_access(
209209
auth_user(User, Impl), Resource, Permission, Context) end,
210-
Module, "access to topic '~ts' in exchange ~ts refused for user '~ts'",
211-
[maps:get(routing_key, Context), rabbit_misc:rs(Resource), Username]);
210+
Module, "~s access to topic '~s' in exchange ~s refused for user '~s'",
211+
[Permission, maps:get(routing_key, Context), rabbit_misc:rs(Resource), Username]);
212212
(_, Else) -> Else
213213
end, ok, Modules).
214214

deps/rabbit/src/rabbit_alarm.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929

3030
-export([remote_conserve_resources/3]). %% Internal use only
3131

32+
-export_type([resource_alarm_source/0,
33+
resource_alert/0]).
34+
3235
-define(SERVER, ?MODULE).
3336

3437
-define(FILE_DESCRIPTOR_RESOURCE, <<"file descriptors">>).
@@ -46,6 +49,9 @@
4649
-type resource_alarm_source() :: 'disk' | 'memory'.
4750
-type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}.
4851
-type alarm() :: local_alarm() | resource_alarm().
52+
-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
53+
IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(),
54+
NodeForWhichAlarmWasSetOrCleared :: node()}.
4955

5056
%%----------------------------------------------------------------------------
5157

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1763,7 +1763,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates) ->
17631763
non_neg_integer(), rabbit_types:ctag(), boolean(),
17641764
rabbit_framing:amqp_table(), any(), rabbit_types:username(),
17651765
rabbit_queue_type:state()) ->
1766-
{ok, rabbit_queue_type:state(), rabbit_queue_type:actions()} |
1766+
{ok, rabbit_queue_type:state()} |
17671767
{error, term()} |
17681768
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
17691769
basic_consume(Q, NoAck, ChPid, LimiterPid,

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@
121121

122122
-define(CREATION_EVENT_KEYS,
123123
[name,
124+
type,
124125
durable,
125126
auto_delete,
126127
arguments,
@@ -129,7 +130,7 @@
129130
user_who_performed_action
130131
]).
131132

132-
-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
133+
-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name, type]]).
133134

134135
%%----------------------------------------------------------------------------
135136

deps/rabbit/src/rabbit_binding.erl

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111

1212
-export([recover/0, recover/2, exists/1, add/2, add/3, remove/1, remove/2, remove/3, remove/4]).
1313
-export([list/1, list_for_source/1, list_for_destination/1,
14-
list_for_source_and_destination/2, list_explicit/0,
15-
list_between/2, has_any_between/2]).
14+
list_for_source_and_destination/2, list_for_source_and_destination/3,
15+
list_explicit/0, list_between/2, has_any_between/2]).
1616
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
1717
process_deletions/2, binding_action/3]).
1818
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]).
@@ -253,26 +253,18 @@ list(VHostPath) ->
253253
list_for_source(?DEFAULT_EXCHANGE(VHostPath)) ->
254254
implicit_bindings(VHostPath);
255255
list_for_source(SrcName) ->
256-
mnesia:async_dirty(
257-
fun() ->
258-
Route = #route{binding = #binding{source = SrcName, _ = '_'}},
259-
[B || #route{binding = B}
260-
<- mnesia:match_object(rabbit_route, Route, read)]
261-
end).
256+
Route = #route{binding = #binding{source = SrcName, _ = '_'}},
257+
Fun = list_for_route(Route, false),
258+
mnesia:async_dirty(Fun).
262259

263260
-spec list_for_destination
264261
(rabbit_types:binding_destination()) -> bindings().
265262

266263
list_for_destination(DstName = #resource{}) ->
267-
ExplicitBindings = mnesia:async_dirty(
268-
fun() ->
269-
Route = #route{binding = #binding{destination = DstName,
270-
_ = '_'}},
271-
[reverse_binding(B) ||
272-
#reverse_route{reverse_binding = B} <-
273-
mnesia:match_object(rabbit_reverse_route,
274-
reverse_route(Route), read)]
275-
end),
264+
Route = #route{binding = #binding{destination = DstName,
265+
_ = '_'}},
266+
Fun = list_for_route(Route, true),
267+
ExplicitBindings = mnesia:async_dirty(Fun),
276268
implicit_for_destination(DstName) ++ ExplicitBindings.
277269

278270
-spec list_between(
@@ -316,27 +308,40 @@ implicit_for_destination(DstQueue = #resource{kind = queue,
316308
implicit_for_destination(_) ->
317309
[].
318310

319-
-spec list_for_source_and_destination
320-
(rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
321-
bindings().
311+
-spec list_for_source_and_destination(rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
312+
bindings().
313+
list_for_source_and_destination(SrcName, DstName) ->
314+
list_for_source_and_destination(SrcName, DstName, false).
322315

316+
-spec list_for_source_and_destination(rabbit_types:binding_source(), rabbit_types:binding_destination(), boolean()) ->
317+
bindings().
323318
list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath),
324319
#resource{kind = queue,
325320
virtual_host = VHostPath,
326-
name = QName} = DstQueue) ->
321+
name = QName} = DstQueue,
322+
_Reverse) ->
327323
[#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
328324
destination = DstQueue,
329325
key = QName,
330326
args = []}];
331-
list_for_source_and_destination(SrcName, DstName) ->
332-
mnesia:async_dirty(
333-
fun() ->
334-
Route = #route{binding = #binding{source = SrcName,
335-
destination = DstName,
336-
_ = '_'}},
337-
[B || #route{binding = B} <- mnesia:match_object(rabbit_route,
338-
Route, read)]
339-
end).
327+
list_for_source_and_destination(SrcName, DstName, Reverse) ->
328+
Route = #route{binding = #binding{source = SrcName,
329+
destination = DstName,
330+
_ = '_'}},
331+
Fun = list_for_route(Route, Reverse),
332+
mnesia:async_dirty(Fun).
333+
334+
list_for_route(Route, false) ->
335+
fun() ->
336+
[B || #route{binding = B} <- mnesia:match_object(rabbit_route, Route, read)]
337+
end;
338+
list_for_route(Route, true) ->
339+
fun() ->
340+
[reverse_binding(B) ||
341+
#reverse_route{reverse_binding = B} <-
342+
mnesia:match_object(rabbit_reverse_route,
343+
reverse_route(Route), read)]
344+
end.
340345

341346
-spec info_keys() -> rabbit_types:info_keys().
342347

0 commit comments

Comments
 (0)