Skip to content

Commit

Permalink
Merge pull request rabbitmq#7095 from rabbitmq/mqtt-peer-addr
Browse files Browse the repository at this point in the history
Remove MQTT processor field peer_addr
  • Loading branch information
michaelklishin authored Jan 30, 2023
2 parents 31f2dcb + cbb389b commit 12ee906
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 36 deletions.
49 changes: 23 additions & 26 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
%% This module contains code that is common to MQTT and Web MQTT connections.
-module(rabbit_mqtt_processor).

-export([info/2, initial_state/2, initial_state/4,
-export([info/2, initial_state/2, initial_state/3,
process_packet/2, serialise/2,
terminate/4, handle_pre_hibernate/0,
handle_ra_event/2, handle_down/2, handle_queue_event/2,
Expand All @@ -21,7 +21,8 @@
-export_type([state/0]).

-import(rabbit_mqtt_util, [mqtt_to_amqp/1,
amqp_to_mqtt/1]).
amqp_to_mqtt/1,
ip_address_to_binary/1]).

-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
Expand Down Expand Up @@ -57,7 +58,6 @@
prefetch :: non_neg_integer(),
client_id :: option(binary()),
conn_name :: option(binary()),
peer_addr :: inet:ip_address(),
host :: inet:ip_address(),
port :: inet:port_number(),
peer_host :: inet:ip_address(),
Expand Down Expand Up @@ -87,21 +87,19 @@

-opaque state() :: #state{}.

-spec initial_state(Socket :: any(), ConnectionName :: binary()) ->
-spec initial_state(Socket :: rabbit_net:socket(),
ConnectionName :: binary()) ->
state().
initial_state(Socket, ConnectionName) ->
{ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(Socket),
initial_state(Socket,
ConnectionName,
fun serialise_and_send_to_client/2,
PeerAddr).
fun serialise_and_send_to_client/2).

-spec initial_state(Socket :: any(),
-spec initial_state(Socket :: rabbit_net:socket(),
ConnectionName :: binary(),
SendFun :: fun((mqtt_packet(), state()) -> any()),
PeerAddr :: inet:ip_address()) ->
SendFun :: fun((mqtt_packet(), state()) -> any())) ->
state().
initial_state(Socket, ConnectionName, SendFun, PeerAddr) ->
initial_state(Socket, ConnectionName, SendFun) ->
Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of
true -> flow;
false -> noflow
Expand All @@ -114,7 +112,6 @@ initial_state(Socket, ConnectionName, SendFun, PeerAddr) ->
prefetch = rabbit_mqtt_util:env(prefetch),
delivery_flow = Flow,
connected_at = os:system_time(milli_seconds),
peer_addr = PeerAddr,
peer_host = PeerHost,
peer_port = PeerPort,
host = Host,
Expand Down Expand Up @@ -393,8 +390,8 @@ check_client_id(_) ->
check_credentials(Packet = #mqtt_packet_connect{username = Username,
password = Password},
State = #state{cfg = #cfg{ssl_login_name = SslLoginName,
peer_addr = PeerAddr}}) ->
Ip = list_to_binary(inet:ntoa(PeerAddr)),
peer_host = PeerHost}}) ->
Ip = ip_address_to_binary(PeerHost),
case creds(Username, Password, SslLoginName) of
nocreds ->
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
Expand Down Expand Up @@ -699,13 +696,15 @@ make_will_msg(#mqtt_packet_connect{will_retain = Retain,
payload = Msg}.

process_login(_UserBin, _PassBin, ClientId,
#state{cfg = #cfg{peer_addr = Addr},
#state{cfg = #cfg{peer_host = PeerHost},
auth_state = #auth_state{username = Username,
user = User,
vhost = VHost
}} = State)
when Username =/= undefined, User =/= undefined, VHost =/= underfined ->
rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt),
rabbit_core_metrics:auth_attempt_failed(ip_address_to_binary(PeerHost),
Username,
mqtt),
?LOG_ERROR(
"MQTT detected duplicate connect attempt for client ID '~ts', user '~ts', vhost '~ts'",
[ClientId, Username, VHost]),
Expand All @@ -714,13 +713,13 @@ process_login(UserBin, PassBin, ClientId,
#state{auth_state = undefined,
cfg = #cfg{socket = Sock,
ssl_login_name = SslLoginName,
peer_addr = Addr
peer_host = PeerHost
}} = State0) ->
{ok, {_PeerHost, _PeerPort, _Host, Port}} = rabbit_net:socket_ends(Sock, inbound),
{VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, Port),
?LOG_DEBUG("MQTT vhost picked using ~s",
[human_readable_vhost_lookup_strategy(VHostPickedUsing)]),
RemoteIpAddressBin = list_to_binary(inet:ntoa(Addr)),
Ip = ip_address_to_binary(PeerHost),
Input = #{vhost => VHost,
username_bin => UsernameBin,
pass_bin => PassBin,
Expand All @@ -736,10 +735,10 @@ process_login(UserBin, PassBin, ClientId,
],
Input, State0) of
{ok, _Output, State} ->
rabbit_core_metrics:auth_attempt_succeeded(RemoteIpAddressBin, UsernameBin, mqtt),
rabbit_core_metrics:auth_attempt_succeeded(Ip, UsernameBin, mqtt),
{ok, State};
{error, _ConnectionRefusedReturnCode, _State} = Err ->
rabbit_core_metrics:auth_attempt_failed(RemoteIpAddressBin, UsernameBin, mqtt),
rabbit_core_metrics:auth_attempt_failed(Ip, UsernameBin, mqtt),
Err
end.

Expand Down Expand Up @@ -837,12 +836,12 @@ check_vhost_access(#{vhost := VHost,
client_id := ClientId,
user := User = #user{username = Username}
} = In,
#state{cfg = #cfg{peer_addr = PeerAddr}} = State) ->
#state{cfg = #cfg{peer_host = PeerHost}} = State) ->
AuthzCtx = #{<<"client_id">> => ClientId},
try rabbit_access_control:check_vhost_access(
User,
VHost,
{ip, PeerAddr},
{ip, PeerHost},
AuthzCtx) of
ok ->
{ok, maps:put(authz_ctx, AuthzCtx, In), State}
Expand All @@ -859,8 +858,8 @@ check_user_loopback(#{vhost := VHost,
user := User,
authz_ctx := AuthzCtx
},
#state{cfg = #cfg{peer_addr = PeerAddr}} = State) ->
case rabbit_access_control:check_user_loopback(UsernameBin, PeerAddr) of
#state{cfg = #cfg{peer_host = PeerHost}} = State) ->
case rabbit_access_control:check_user_loopback(UsernameBin, PeerHost) of
ok ->
AuthState = #auth_state{user = User,
username = UsernameBin,
Expand Down Expand Up @@ -1964,7 +1963,6 @@ format_status(
prefetch = Prefetch,
client_id = ClientID,
conn_name = ConnName,
peer_addr = PeerAddr,
host = Host,
port = Port,
peer_host = PeerHost,
Expand All @@ -1986,7 +1984,6 @@ format_status(
prefetch => Prefetch,
client_id => ClientID,
conn_name => ConnName,
peer_addr => PeerAddr,
host => Host,
port => Port,
peer_host => PeerHost,
Expand Down
7 changes: 6 additions & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
init_sparkplug/0,
mqtt_to_amqp/1,
amqp_to_mqtt/1,
truncate_binary/2
truncate_binary/2,
ip_address_to_binary/1
]).

-define(MAX_TOPIC_TRANSLATION_CACHE_SIZE, 12).
Expand Down Expand Up @@ -209,3 +210,7 @@ truncate_binary(Bin, Size)
truncate_binary(Bin, Size)
when is_binary(Bin) ->
binary:part(Bin, 0, Size).

-spec ip_address_to_binary(inet:ip_address()) -> binary().
ip_address_to_binary(IpAddress) ->
list_to_binary(inet:ntoa(IpAddress)).
15 changes: 6 additions & 9 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,22 @@ upgrade(Req, Env, Handler, HandlerState) ->
upgrade(Req, Env, Handler, HandlerState, Opts) ->
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).

takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, {HandlerState, PeerAddr}}) ->
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
Sock = case HandlerState#state.socket of
undefined ->
Socket;
ProxyInfo ->
{rabbit_proxy_socket, Socket, ProxyInfo}
end,
cowboy_websocket:takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
{Handler, {HandlerState#state{socket = Sock}, PeerAddr}}).
{Handler, HandlerState#state{socket = Sock}}).

%% cowboy_websocket
init(Req, Opts) ->
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
undefined ->
no_supported_sub_protocol(undefined, Req);
Protocol ->
{PeerAddr, _PeerPort} = maps:get(peer, Req),
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
WsOpts = maps:merge(#{compress => true}, WsOpts0),
case lists:member(<<"mqtt">>, Protocol) of
Expand All @@ -82,16 +81,15 @@ init(Req, Opts) ->
true ->
{?MODULE,
cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req),
{#state{socket = maps:get(proxy_header, Req, undefined)},
PeerAddr},
#state{socket = maps:get(proxy_header, Req, undefined)},
WsOpts}
end
end.

-spec websocket_init({state(), PeerAddr :: binary()}) ->
-spec websocket_init(state()) ->
{cowboy_websocket:commands(), state()} |
{cowboy_websocket:commands(), state(), hibernate}.
websocket_init({State0 = #state{socket = Sock}, PeerAddr}) ->
websocket_init(State0 = #state{socket = Sock}) ->
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [web_mqtt]}),
ok = file_handle_cache:obtain(),
case rabbit_net:connection_string(Sock, inbound) of
Expand All @@ -102,8 +100,7 @@ websocket_init({State0 = #state{socket = Sock}, PeerAddr}) ->
PState = rabbit_mqtt_processor:initial_state(
rabbit_net:unwrap_socket(Sock),
ConnName,
fun send_reply/2,
PeerAddr),
fun send_reply/2),
State1 = State0#state{conn_name = ConnName,
proc_state = PState},
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),
Expand Down

0 comments on commit 12ee906

Please sign in to comment.