Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP 1.0: support JWT (OAuth 2) token renewal #12599

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_access_control.erl
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ check_user_id0(ClaimedUserName, #user{username = ActualUserName,
end.

-spec update_state(User :: rabbit_types:user(), NewState :: term()) ->
{'ok', rabbit_types:auth_user()} |
{'ok', rabbit_types:user()} |
{'refused', string()} |
{'error', any()}.

Expand Down
14 changes: 13 additions & 1 deletion deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,19 @@ handle_http_req(<<"GET">>,
Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName),
Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key],
RespPayload = encode_bindings(Bindings),
{<<"200">>, RespPayload, PermCaches}.
{<<"200">>, RespPayload, PermCaches};

handle_http_req(<<"PUT">>,
[<<"auth">>, <<"tokens">>],
_Query,
ReqPayload,
_Vhost,
_User,
ConnPid,
PermCaches) ->
{binary, Token} = ReqPayload,
ok = rabbit_amqp_reader:set_credential(ConnPid, Token),
{<<"204">>, null, PermCaches}.

decode_queue({map, KVList}) ->
M = lists:foldl(
Expand Down
84 changes: 57 additions & 27 deletions deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

-export([init/1,
info/2,
mainloop/2]).
mainloop/2,
set_credential/2]).

-export([system_continue/3,
system_terminate/4,
Expand Down Expand Up @@ -53,6 +54,7 @@
channel_max :: non_neg_integer(),
auth_mechanism :: sasl_init_unprocessed | {binary(), module()},
auth_state :: term(),
credential_timer :: undefined | reference(),
properties :: undefined | {map, list(tuple())}
}).

Expand Down Expand Up @@ -139,6 +141,11 @@ server_properties() ->
Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1],
{map, Props}.

-spec set_credential(pid(), binary()) -> ok.
set_credential(Pid, Credential) ->
Pid ! {set_credential, Credential},
ok.

%%--------------------------------------------------------------------------

inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
Expand Down Expand Up @@ -243,6 +250,8 @@ handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
State;
handle_other(terminate_connection, _State) ->
stop;
handle_other({set_credential, Cred}, State) ->
set_credential0(Cred, State);
handle_other(credential_expired, State) ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []),
handle_exception(State, 0, Error);
Expand Down Expand Up @@ -416,15 +425,17 @@ handle_connection_frame(
},
helper_sup = HelperSupPid,
sock = Sock} = State0) ->
logger:update_process_metadata(#{amqp_container => ContainerId}),
Vhost = vhost(Hostname),
logger:update_process_metadata(#{amqp_container => ContainerId,
vhost => Vhost,
user => Username}),
ok = check_user_loopback(State0),
ok = check_vhost_exists(Vhost, State0),
ok = check_vhost_alive(Vhost),
ok = rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}),
ok = check_vhost_connection_limit(Vhost, Username),
ok = check_user_connection_limit(Username),
ok = ensure_credential_expiry_timer(User),
Timer = maybe_start_credential_expiry_timer(User),
rabbit_core_metrics:auth_attempt_succeeded(<<>>, Username, amqp10),
notify_auth(user_authentication_success, Username, State0),
rabbit_log_connection:info(
Expand Down Expand Up @@ -499,7 +510,8 @@ handle_connection_frame(
outgoing_max_frame_size = OutgoingMaxFrameSize,
channel_max = EffectiveChannelMax,
properties = Properties,
timeout = ReceiveTimeoutMillis},
timeout = ReceiveTimeoutMillis,
credential_timer = Timer},
heartbeater = Heartbeater},
State = start_writer(State1),
HostnameVal = case Hostname of
Expand Down Expand Up @@ -871,39 +883,57 @@ check_user_connection_limit(Username) ->
end.


%% TODO Provide a means for the client to refresh the credential.
%% This could be either via:
%% 1. SASL (if multiple authentications are allowed on the same AMQP 1.0 connection), see
%% https://datatracker.ietf.org/doc/html/rfc4422#section-3.8 , or
%% 2. Claims Based Security (CBS) extension, see https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html
%% and https://github.com/rabbitmq/rabbitmq-server/issues/9259
%% 3. Simpler variation of 2. where a token is put to a special /token node.
%%
%% If the user does not refresh their credential on time (the only implementation currently),
%% close the entire connection as we must assume that vhost access could have been revoked.
%%
%% If the user refreshes their credential on time (to be implemented), the AMQP reader should
%% 1. rabbit_access_control:check_vhost_access/4
%% 2. send a message to all its sessions which should then erase the permission caches and
%% re-check all link permissions (i.e. whether reading / writing to exchanges / queues is still allowed).
%% 3. cancel the current timer, and set a new timer
%% similary as done for Stream connections, see https://github.com/rabbitmq/rabbitmq-server/issues/10292
ensure_credential_expiry_timer(User) ->
set_credential0(Cred,
State = #v1{connection = #v1_connection{
user = User0,
vhost = Vhost,
credential_timer = OldTimer} = Conn,
tracked_channels = Chans,
sock = Sock}) ->
rabbit_log:info("updating credential", []),
case rabbit_access_control:update_state(User0, Cred) of
{ok, User} ->
try rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}) of
ok ->
maps:foreach(fun(_ChanNum, Pid) ->
rabbit_amqp_session:reset_authz(Pid, User)
end, Chans),
case OldTimer of
undefined -> ok;
Ref -> ok = erlang:cancel_timer(Ref, [{info, false}])
end,
NewTimer = maybe_start_credential_expiry_timer(User),
State#v1{connection = Conn#v1_connection{
user = User,
credential_timer = NewTimer}}
catch _:Reason ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"access to vhost ~s failed for new credential: ~p",
[Vhost, Reason]),
handle_exception(State, 0, Error)
end;
Err ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"credential update failed: ~p",
[Err]),
handle_exception(State, 0, Error)
end.

maybe_start_credential_expiry_timer(User) ->
case rabbit_access_control:expiry_timestamp(User) of
never ->
ok;
undefined;
Ts when is_integer(Ts) ->
Time = (Ts - os:system_time(second)) * 1000,
rabbit_log:debug(
"Credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)",
"credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)",
[Time, Ts]),
case Time > 0 of
true ->
_TimerRef = erlang:send_after(Time, self(), credential_expired),
ok;
erlang:send_after(Time, self(), credential_expired);
false ->
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"Credential expired ~b ms ago", [abs(Time)])
"credential expired ~b ms ago", [abs(Time)])
end
end.

Expand Down
47 changes: 45 additions & 2 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@
list_local/0,
conserve_resources/3,
check_resource_access/4,
check_read_permitted_on_topic/4
check_read_permitted_on_topic/4,
reset_authz/2
]).

-export([init/1,
Expand Down Expand Up @@ -393,6 +394,10 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
handle_max = ClientHandleMax}}) ->
process_flag(trap_exit, true),
rabbit_process_flag:adjust_for_message_handling_proc(),
logger:update_process_metadata(#{channel_number => ChannelNum,
connection => ConnName,
vhost => Vhost,
user => User#user.username}),

ok = pg:join(pg_scope(), self(), self()),
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
Expand Down Expand Up @@ -480,6 +485,10 @@ list_local() ->
conserve_resources(Pid, Source, {_, Conserve, _}) ->
gen_server:cast(Pid, {conserve_resources, Source, Conserve}).

-spec reset_authz(pid(), rabbit_types:user()) -> ok.
reset_authz(Pid, User) ->
gen_server:cast(Pid, {reset_authz, User}).

handle_call(Msg, _From, State) ->
Reply = {error, {not_understood, Msg}},
reply(Reply, State).
Expand Down Expand Up @@ -574,7 +583,18 @@ handle_cast({conserve_resources, Alarm, Conserve},
noreply(State);
handle_cast(refresh_config, #state{cfg = #cfg{vhost = Vhost} = Cfg} = State0) ->
State = State0#state{cfg = Cfg#cfg{trace_state = rabbit_trace:init(Vhost)}},
noreply(State).
noreply(State);
handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) ->
State1 = State0#state{
permission_cache = [],
topic_permission_cache = [],
cfg = Cfg#cfg{user = User}},
try recheck_authz(State1) of
State ->
noreply(State)
catch exit:#'v1_0.error'{} = Error ->
log_error_and_close_session(Error, State1)
end.

log_error_and_close_session(
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
Expand Down Expand Up @@ -3522,6 +3542,29 @@ check_topic_authorisation(#exchange{type = topic,
check_topic_authorisation(_, _, _, _, Cache) ->
Cache.

recheck_authz(#state{incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks,
permission_cache = Cache0,
cfg = #cfg{user = User}
} = State) ->
rabbit_log:debug("rechecking link authorizations", []),
Cache1 = maps:fold(
fun(_Handle, #incoming_link{exchange = X}, Cache) ->
case X of
#exchange{name = XName} ->
check_resource_access(XName, write, User, Cache);
#resource{} = XName ->
check_resource_access(XName, write, User, Cache);
to ->
Cache
end
end, Cache0, IncomingLinks),
Cache2 = maps:fold(
fun(_Handle, #outgoing_link{queue_name = QName}, Cache) ->
check_resource_access(QName, read, User, Cache)
end, Cache1, OutgoingLinks),
State#state{permission_cache = Cache2}.

check_user_id(Mc, User) ->
case rabbit_access_control:check_user_id(Mc, User) of
ok ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ force_event_refresh(Ref) ->
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).

-spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}.
-spec update_user_state(pid(), rabbit_types:user()) -> 'ok' | {error, channel_terminated}.

update_user_state(Pid, UserState) when is_pid(Pid) ->
case erlang:is_process_alive(Pid) of
Expand Down
21 changes: 20 additions & 1 deletion deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
declare_exchange/3,
bind_exchange/5,
unbind_exchange/5,
delete_exchange/2
delete_exchange/2,

set_token/2
].

-define(TIMEOUT, 20_000).
Expand Down Expand Up @@ -381,6 +383,23 @@ delete_exchange(LinkPair, ExchangeName) ->
Err
end.

%% Renew OAuth 2.0 token.
-spec set_token(link_pair(), binary()) ->
ok | {error, term()}.
set_token(LinkPair, Token) ->
Props = #{subject => <<"PUT">>,
to => <<"/auth/tokens">>},
Body = {binary, Token},
case request(LinkPair, Props, Body) of
{ok, Resp} ->
case is_success(Resp) of
true -> ok;
false -> {error, Resp}
end;
Err ->
Err
end.

-spec request(link_pair(), amqp10_msg:amqp10_properties(), amqp10_prim()) ->
{ok, Response :: amqp10_msg:amqp10_msg()} | {error, term()}.
request(#link_pair{session = Session,
Expand Down
Loading
Loading