Skip to content

Commit

Permalink
Add limit on number of inactive transports to retain info for
Browse files Browse the repository at this point in the history
Summary:
Split the transport tables into a table for transport info and a table
for file info. Add the ability to limit the number of inactive
transports to retain info for to avoid the transport info tables from
growing without bound.

Differential Revision: D60845349

fbshipit-source-id: dc7086493146d8d161e2f005aa66c6fa3f3fee4a
  • Loading branch information
hsun324 authored and facebook-github-bot committed Aug 6, 2024
1 parent d51cdf6 commit f769c78
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 77 deletions.
3 changes: 3 additions & 0 deletions include/wa_raft.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@
%% Time in seconds after which a transport that has not made progress should be considered failed
-define(RAFT_TRANSPORT_IDLE_TIMEOUT(), ?RAFT_CONFIG(transport_idle_timeout_secs, 30)).

%% Maximum number of previous inactive transports to retain info for.
-define(RAFT_TRANSPORT_INACTIVE_INFO_LIMIT(), ?RAFT_CONFIG(raft_transport_inactive_info_limit, 30)).

%% Size in bytes of individual chunks (messages containing file data) to be sent during transports
%% using the dist transport provider
-define(RAFT_DIST_TRANSPORT_CHUNK_SIZE(), ?RAFT_CONFIG(dist_transport_chunk_size, 1 * 1024 * 1024)).
Expand Down
239 changes: 162 additions & 77 deletions src/wa_raft_transport.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
file_info/0
]).

%% Name of the ETS table to keep records for transports
-define(TRANSPORT_TABLE, wa_raft_transport_transports).
%% Name of the ETS table to keep records for files
-define(FILE_TABLE, wa_raft_transport_files).

-define(RAFT_TRANSPORT_PARTITION_SUBDIRECTORY, "transport").

-define(RAFT_TRANSPORT_SCAN_INTERVAL_SECS, 30).
Expand All @@ -82,9 +87,6 @@
%% Counter - inflight receives
-define(RAFT_TRANSPORT_COUNTER_ACTIVE_RECEIVES, 1).

-define(INFO_KEY(ID), {ID, info}).
-define(FILE_KEY(ID, FileID), {ID, {file, FileID}}).

-type transport_id() :: pos_integer().
-type transport_info() :: #{
type := sender | receiver,
Expand Down Expand Up @@ -247,18 +249,19 @@ complete(ID, FileID, Status, Pid) ->

-spec setup_tables() -> ok.
setup_tables() ->
?MODULE = ets:new(?MODULE, [named_table, set, public]),
?TRANSPORT_TABLE = ets:new(?TRANSPORT_TABLE, [named_table, set, public]),
?FILE_TABLE = ets:new(?FILE_TABLE, [named_table, set, public]),
ok.

-spec transports() -> [transport_id()].
transports() ->
ets:select(?MODULE, [{{?INFO_KEY('$1'), '_'}, [], ['$1']}]).
ets:select(?TRANSPORT_TABLE, [{{'$1', '_'}, [], ['$1']}]).

-spec transport_info(ID :: transport_id()) -> {ok, Info :: transport_info()} | not_found.
transport_info(ID) ->
case ets:lookup(?MODULE, ?INFO_KEY(ID)) of
[{_, Info}] -> {ok, Info};
[] -> not_found
case ets:lookup_element(?TRANSPORT_TABLE, ID, 2, not_found) of
not_found -> not_found;
Info -> {ok, Info}
end.

-spec transport_info(ID :: transport_id(), Item :: atom()) -> Info :: term() | undefined.
Expand All @@ -272,33 +275,49 @@ transport_info(ID, Item) ->
% provide any atomicity guarantees.
-spec set_transport_info(ID :: transport_id(), Info :: transport_info(), Counters :: counters:counters_ref()) -> term().
set_transport_info(ID, #{atomics := TransportAtomics} = Info, Counters) ->
true = ets:insert(?MODULE, {?INFO_KEY(ID), Info}),
true = ets:insert(?TRANSPORT_TABLE, {ID, Info}),
maybe_update_active_inbound_transport_counts(undefined, Info, Counters),
ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, erlang:system_time(millisecond)).

% This function should only be called from the "factory" process since it does not
% provide any atomicity guarantees.
-spec update_transport_info(ID :: transport_id(), Fun :: fun((Info :: transport_info()) -> NewInfo :: transport_info()), Counters :: counters:counters_ref()) -> ok | not_found.
update_transport_info(ID, Fun, Counters) ->
-spec update_and_get_transport_info(
ID :: transport_id(),
Fun :: fun((Info :: transport_info()) -> NewInfo :: transport_info()),
Counters :: counters:counters_ref()
) -> {ok, NewOrExistingInfo :: transport_info()} | not_found.
update_and_get_transport_info(ID, Fun, Counters) ->
case transport_info(ID) of
{ok, #{atomics := TransportAtomics} = Info} ->
case Fun(Info) of
Info ->
ok;
{ok, Info};
NewInfo ->
true = ets:insert(?MODULE, {?INFO_KEY(ID), NewInfo}),
true = ets:insert(?TRANSPORT_TABLE, {ID, NewInfo}),
ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, erlang:system_time(millisecond)),
maybe_update_active_inbound_transport_counts(Info, NewInfo, Counters)
ok = maybe_update_active_inbound_transport_counts(Info, NewInfo, Counters),
{ok, NewInfo}
end;
not_found ->
not_found
end.

-spec delete_transport_info(ID :: transport_id()) -> ok | not_found.
delete_transport_info(ID) ->
case transport_info(ID) of
{ok, #{total_files := TotalFiles}} ->
lists:foreach(fun (FileID) -> delete_file_info(ID, FileID) end, lists:seq(1, TotalFiles)),
ets:delete(?TRANSPORT_TABLE, ID),
ok;
not_found ->
not_found
end.

-spec file_info(ID :: transport_id(), FileID :: file_id()) -> {ok, Info :: file_info()} | not_found.
file_info(ID, FileID) ->
case ets:lookup(?MODULE, ?FILE_KEY(ID, FileID)) of
[{_, Info}] -> {ok, Info};
[] -> not_found
case ets:lookup_element(?FILE_TABLE, {ID, FileID}, 2, not_found) of
not_found -> not_found;
Info -> {ok, Info}
end.

-spec maybe_update_active_inbound_transport_counts(OldInfo :: transport_info() | undefined, NewInfo :: transport_info(), Counters :: counters:counters_ref()) -> ok.
Expand All @@ -315,7 +334,7 @@ maybe_update_active_inbound_transport_counts(_, _, _) ->
% transport of the specified file since it does not provide any atomicity guarantees.
-spec set_file_info(ID :: transport_id(), FileID :: file_id(), Info :: file_info()) -> term().
set_file_info(ID, FileID, #{atomics := {TransportAtomics, FileAtomics}} = Info) ->
true = ets:insert(?MODULE, {?FILE_KEY(ID, FileID), Info}),
true = ets:insert(?FILE_TABLE, {{ID, FileID}, Info}),
NowMillis = erlang:system_time(millisecond),
ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis),
ok = atomics:put(FileAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis).
Expand All @@ -330,7 +349,7 @@ update_file_info(ID, FileID, Fun) ->
Info ->
ok;
NewInfo ->
true = ets:insert(?MODULE, {?FILE_KEY(ID, FileID), NewInfo}),
true = ets:insert(?FILE_TABLE, {{ID, FileID}, NewInfo}),
NowMillis = erlang:system_time(millisecond),
ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis),
ok = atomics:put(FileAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis),
Expand All @@ -340,6 +359,11 @@ update_file_info(ID, FileID, Fun) ->
not_found
end.

-spec delete_file_info(ID :: transport_id(), FileID :: file_id()) -> ok.
delete_file_info(ID, FileID) ->
ets:delete(?FILE_TABLE, {ID, FileID}),
ok.

%%-------------------------------------------------------------------
%% Internal API
%%-------------------------------------------------------------------
Expand Down Expand Up @@ -443,14 +467,18 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters =
],

TotalFiles =:= 0 andalso
update_transport_info(ID, fun (Info0) ->
Info1 = Info0#{status => completed, end_ts => NowMillis},
Info2 = case maybe_notify_complete(ID, Info1, State) of
ok -> Info1;
{error, Reason} -> Info1#{status => failed, error => {notify_failed, Reason}}
update_and_get_transport_info(
ID,
fun (Info0) ->
Info1 = Info0#{status => completed, end_ts => NowMillis},
Info2 = case maybe_notify_complete(ID, Info1, State) of
ok -> Info1;
{error, Reason} -> Info1#{status => failed, error => {notify_failed, Reason}}
end,
maybe_notify(ID, Info2)
end,
maybe_notify(ID, Info2)
end, Counters),
Counters
),

{reply, ok, State}
end
Expand All @@ -459,26 +487,39 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters =
?RAFT_COUNT('raft.transport.receive.error'),
?LOG_WARNING("wa_raft_transport failed to accept transport ~p due to ~p ~p: ~n~p",
[ID, T, E, S], #{domain => [whatsapp, wa_raft]}),
update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => erlang:system_time(millisecond), error => {receive_failed, {T, E, S}}} end, Counters),
update_and_get_transport_info(
ID,
fun (Info) ->
Info#{
status => failed,
end_ts => erlang:system_time(millisecond),
error => {receive_failed, {T, E, S}}
}
end,
Counters
),
{reply, {error, failed}, State}
end;
handle_call({cancel, ID, Reason}, _From, #state{counters = Counters} = State) ->
?LOG_NOTICE("wa_raft_transport got cancellation request for ~p for reason ~p",
[ID, Reason], #{domain => [whatsapp, wa_raft]}),
Result =
update_transport_info(ID,
fun
(#{status := running} = Info) ->
NowMillis = erlang:system_time(millisecond),
Info#{status => cancelled, end_ts => NowMillis, error => {cancelled, Reason}};
(Info) ->
Info
end,
Counters),
Reply = case Result of
ok -> ok;
not_found -> {error, not_found}
end,
Reply =
case
update_and_get_transport_info(
ID,
fun
(#{status := running} = Info) ->
NowMillis = erlang:system_time(millisecond),
Info#{status => cancelled, end_ts => NowMillis, error => {cancelled, Reason}};
(Info) ->
Info
end,
Counters
)
of
{ok, _Info} -> ok;
not_found -> {error, not_found}
end,
{reply, Reply, State};
handle_call(Request, _From, #state{} = State) ->
?LOG_WARNING("wa_raft_transport received unrecognized factory call ~p",
Expand All @@ -500,31 +541,34 @@ handle_cast({complete, ID, FileID, Status, Pid}, #state{counters = Counters} = S
Result0 =:= not_found andalso
?LOG_WARNING("wa_raft_transport got complete report for unknown file ~p:~p",
[ID, FileID], #{domain => [whatsapp, wa_raft]}),
Result1 = update_transport_info(ID,
fun
(#{status := running, type := Type, completed_files := CompletedFiles, total_files := TotalFiles} = Info0) ->
Info1 = Info0#{completed_files => CompletedFiles + 1},
Info2 = case CompletedFiles + 1 of
TotalFiles -> Info1#{status => completed, end_ts => NowMillis};
_ -> Info1
end,
Info3 = case Status of
ok -> Info2;
_ -> Info2#{status => failed, end_ts => NowMillis, error => {file, FileID, Status}}
end,
Info4 = case maybe_notify_complete(ID, Info3, State) of
ok -> Info3;
{error, Reason} -> Info3#{status => failed, error => {notify_failed, Reason}}
end,
Info5 = case Type of
sender -> maybe_submit_one(ID, Info4, Pid);
_ -> Info4
end,
maybe_notify(ID, Info5);
(Info) ->
Info
end,
Counters),
Result1 =
update_and_get_transport_info(
ID,
fun
(#{status := running, type := Type, completed_files := CompletedFiles, total_files := TotalFiles} = Info0) ->
Info1 = Info0#{completed_files => CompletedFiles + 1},
Info2 = case CompletedFiles + 1 of
TotalFiles -> Info1#{status => completed, end_ts => NowMillis};
_ -> Info1
end,
Info3 = case Status of
ok -> Info2;
_ -> Info2#{status => failed, end_ts => NowMillis, error => {file, FileID, Status}}
end,
Info4 = case maybe_notify_complete(ID, Info3, State) of
ok -> Info3;
{error, Reason} -> Info3#{status => failed, error => {notify_failed, Reason}}
end,
Info5 = case Type of
sender -> maybe_submit_one(ID, Info4, Pid);
_ -> Info4
end,
maybe_notify(ID, Info5);
(Info) ->
Info
end,
Counters
),
Result1 =:= not_found andalso
?LOG_WARNING("wa_raft_transport got complete report for unknown transfer ~p",
[ID], #{domain => [whatsapp, wa_raft]}),
Expand All @@ -535,10 +579,19 @@ handle_cast(Request, State) ->

-spec handle_info(Info :: term(), State :: #state{}) -> {noreply, NewState :: #state{}}.
handle_info(scan, #state{counters = Counters} = State) ->
lists:foreach(
fun (ID) ->
update_transport_info(ID, fun (Info) -> scan_transport(ID, Info) end, Counters)
end, transports()),
InactiveTransports =
lists:filter(
fun (ID) ->
case update_and_get_transport_info(ID, fun (Info) -> scan_transport(ID, Info) end, Counters) of
{ok, #{status := Status}} -> Status =/= requested andalso Status =/= running;
not_found -> false
end
end, transports()),
ExcessTransports = length(InactiveTransports) - ?RAFT_TRANSPORT_INACTIVE_INFO_LIMIT(),
ExcessTransports > 0 andalso begin
ExcessTransportIDs = lists:sublist(lists:sort(InactiveTransports), ExcessTransports),
lists:foreach(fun delete_transport_info/1, ExcessTransportIDs)
end,
schedule_scan(),
{noreply, State};
handle_info(Info, State) ->
Expand Down Expand Up @@ -605,7 +658,8 @@ handle_transport_start(From, Peer, Meta, Root, Counters) ->
FileData = [{FileID, Filename, Size} || {FileID, Filename, _, _, Size} <- Files],
case gen_server:call({?MODULE, Peer}, {transport, ID, node(), Module, Meta, FileData}) of
ok ->
update_transport_info(ID,
update_and_get_transport_info(
ID,
fun (Info0) ->
Info1 = case From of
undefined -> Info0;
Expand All @@ -621,28 +675,59 @@ handle_transport_start(From, Peer, Meta, Root, Counters) ->
Workers = [Pid || {_Id, Pid, _Type, _Modules} <- supervisor:which_children(Sup), is_pid(Pid)],
lists:foldl(fun (Pid, InfoN) -> maybe_submit_one(ID, InfoN, Pid) end, Info2, Workers)
end
end,
Counters),
end,
Counters
),
{ok, ID};
{error, receiver_overloaded} ->
?RAFT_COUNT('raft.transport.rejected.receiver_overloaded'),
?LOG_WARNING("wa_raft_transport peer ~p rejected transport ~p because of overload",
[Peer, ID], #{domain => [whatsapp, wa_raft]}),
update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => NowMillis, error => {rejected, receiver_overloaded}} end, Counters),
update_and_get_transport_info(
ID,
fun (Info) ->
Info#{
status => failed,
end_ts => NowMillis,
error => {rejected, receiver_overloaded}
}
end,
Counters
),
{error, receiver_overloaded};
Error ->
?RAFT_COUNT('raft.transport.rejected'),
?LOG_WARNING("wa_raft_transport peer ~p rejected transport ~p with error ~p",
[Peer, ID, Error], #{domain => [whatsapp, wa_raft]}),
update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => NowMillis, error => {rejected, Error}} end, Counters),
update_and_get_transport_info(
ID,
fun (Info) ->
Info#{
status => failed,
end_ts => NowMillis,
error => {rejected, Error}
}
end,
Counters
),
{error, Error}
end
catch
T:E:S ->
?RAFT_COUNT('raft.transport.start.error'),
?LOG_WARNING("wa_raft_transport failed to start transport ~p due to ~p ~p: ~n~p",
[ID, T, E, S], #{domain => [whatsapp, wa_raft]}),
update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => erlang:system_time(millisecond), error => {start, {T, E, S}}} end, Counters),
update_and_get_transport_info(
ID,
fun (Info) ->
Info#{
status => failed,
end_ts => erlang:system_time(millisecond),
error => {start, {T, E, S}}
}
end,
Counters
),
{error, failed}
end.

Expand Down Expand Up @@ -746,7 +831,7 @@ maybe_notify(ID, #{status := Status, notify := Notify} = Info) when Status =/= r
maybe_notify(_ID, Info) ->
Info.

-spec scan_transport(transport_id(), transport_info()) -> transport_info().
-spec scan_transport(ID :: transport_id(), Info :: transport_info()) -> NewInfo :: transport_info().
scan_transport(ID, #{status := running, atomics := TransportAtomics} = Info) ->
LastUpdateTs = atomics:get(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS),
NowMillis = erlang:system_time(millisecond),
Expand Down

0 comments on commit f769c78

Please sign in to comment.