Skip to content

Commit

Permalink
Support for snapshot catchup receiver to throttle incoming transports
Browse files Browse the repository at this point in the history
Summary:
Allows a peer to communicate it is at the limit for incoming shard transfers.
Sender will not bother such a peer until a backoff period (30s) expires.

Differential Revision: D54828872

fbshipit-source-id: 9ef30657086ce92fcacbf9e1cef3f4c741c73280
  • Loading branch information
Madan Jampani authored and facebook-github-bot committed Mar 18, 2024
1 parent aed431c commit 8563bf6
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 49 deletions.
3 changes: 3 additions & 0 deletions include/wa_raft.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@
%% Maximum bytes per heartbeat for catchup by bulk log transfer
-define(RAFT_CATCHUP_MAX_BYTES_PER_BATCH, raft_catchup_log_batch_bytes).
-define(RAFT_CATCHUP_MAX_BYTES_PER_BATCH(App), ?RAFT_APP_CONFIG(App, ?RAFT_CATCHUP_MAX_BYTES_PER_BATCH, 4 * 1024 * 1024)).
% Time to wait before retrying snapshot transport to a overloaded peer.
-define(RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS, snapshot_catchup_overloaded_backoff_ms).
-define(RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS(App), ?RAFT_APP_CONFIG(App, ?RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS, 1000)).

%% Time in seconds to retain transport destination directories after use
-define(RAFT_TRANSPORT_RETAIN_INTERVAL, transport_retain_min_secs).
Expand Down
4 changes: 2 additions & 2 deletions src/wa_raft_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2437,7 +2437,7 @@ select_follower_replication_mode(FollowerLastIndex, #raft_state{application = Ap
%% transports have been started then no transport is created. This function
%% always performs this request asynchronously.
-spec request_snapshot_for_follower(node(), #raft_state{}) -> term().
request_snapshot_for_follower(FollowerId, #raft_state{name = Name, table = Table, partition = Partition, data_dir = DataDir, log_view = View} = State) ->
request_snapshot_for_follower(FollowerId, #raft_state{application = App, name = Name, table = Table, partition = Partition, data_dir = DataDir, log_view = View} = State) ->
case lists:member({Name, FollowerId}, config_witnesses(config(State))) of
true ->
% If node is a witness, we can bypass the transport process since we don't have to
Expand All @@ -2447,7 +2447,7 @@ request_snapshot_for_follower(FollowerId, #raft_state{name = Name, table = Table
LastLogPos = #raft_log_pos{index = LastLogIndex, term = LastLogTerm},
wa_raft_server:snapshot_available({Name, FollowerId}, DataDir, LastLogPos);
false ->
wa_raft_snapshot_catchup:request_snapshot_transport(FollowerId, Table, Partition)
wa_raft_snapshot_catchup:request_snapshot_transport(App, FollowerId, Table, Partition)
end.

-spec request_bulk_logs_for_follower(#raft_identity{}, wa_raft_log:log_index(), #raft_state{}) -> ok.
Expand Down
45 changes: 29 additions & 16 deletions src/wa_raft_snapshot_catchup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
%% Internal API
-export([
current_snapshot_transports/0,
request_snapshot_transport/3
request_snapshot_transport/4
]).

%% Snapshot catchup server implementation
Expand All @@ -47,7 +47,9 @@
% currently active transports
transports = #{} :: #{key() => #transport{}},
% counts of active transports that are using a particular snapshot
snapshots = #{} :: #{snapshot_key() => pos_integer()}
snapshots = #{} :: #{snapshot_key() => pos_integer()},
% backoff windows for nodes that previously reported being overloaded
backoff_windows = #{} :: #{node() => pos_integer()}
}).

-spec child_spec() -> supervisor:child_spec().
Expand All @@ -68,9 +70,9 @@ start_link() ->
current_snapshot_transports() ->
gen_server:call(?MODULE, current_snapshot_transports).

-spec request_snapshot_transport(Peer :: node(), Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> ok.
request_snapshot_transport(Peer, Table, Partition) ->
gen_server:cast(?MODULE, {request_snapshot_transport, Peer, Table, Partition}).
-spec request_snapshot_transport(App :: atom(), Peer :: node(), Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> ok.
request_snapshot_transport(App, Peer, Table, Partition) ->
gen_server:cast(?MODULE, {request_snapshot_transport, App, Peer, Table, Partition}).

-spec init(Args :: term()) -> {ok, #state{}}.
init([]) ->
Expand All @@ -85,24 +87,35 @@ handle_call(Request, From, #state{} = State) ->
?LOG_NOTICE("received unrecognized call ~P from ~0p", [Request, 25, From], #{domain => [whatsapp, wa_raft]}),
{noreply, State}.

-spec handle_cast({request_snapshot_transport, node(), wa_raft:table(), wa_raft:partition()}, State :: #state{}) -> {noreply, #state{}}.
handle_cast({request_snapshot_transport, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots} = State) ->
case Transports of
#{{Peer, Table, Partition} := _} ->
-spec handle_cast({request_snapshot_transport, atom(), node(), wa_raft:table(), wa_raft:partition()}, State :: #state{}) -> {noreply, #state{}}.
handle_cast({request_snapshot_transport, App, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots, backoff_windows = BackoffWindows} = State) ->
NowMillis = erlang:monotonic_time(millisecond),
case {Transports, BackoffWindows} of
{#{{Peer, Table, Partition} := _}, _} ->
{noreply, State};
_ ->
{_, #{Peer := RetryAfterTs}} when RetryAfterTs > NowMillis ->
{noreply, State};
{_, _} ->
case maps:size(Transports) < ?RAFT_MAX_CONCURRENT_SNAPSHOT_CATCHUP() of
true ->
try
StorageRef = wa_raft_storage:registered_name(Table, Partition),
{ok, #raft_log_pos{index = Index, term = Term} = LogPos} = wa_raft_storage:create_snapshot(StorageRef),
Path = ?RAFT_SNAPSHOT_PATH(Table, Partition, Index, Term),
{ok, ID} = wa_raft_transport:start_snapshot_transfer(Peer, Table, Partition, LogPos, Path, infinity),
?LOG_NOTICE("started sending snapshot for ~0p:~0p at ~0p:~0p over transport ~0p",
[Table, Partition, Index, Term, ID], #{domain => [whatsapp, wa_raft]}),
NewTransports = Transports#{{Peer, Table, Partition} => #transport{id = ID, snapshot = LogPos}},
NewSnapshots = maps:update_with({Table, Partition, LogPos}, fun(V) -> V + 1 end, 1, Snapshots),
{noreply, State#state{transports = NewTransports, snapshots = NewSnapshots}}
case wa_raft_transport:start_snapshot_transfer(Peer, Table, Partition, LogPos, Path, infinity) of
{error, receiver_overloaded} ->
?LOG_NOTICE("Peer ~0p reported being overloaded. Not sending snapshot for ~0p:~0p. Will try again later",
[Peer, Table, Partition], #{domain => [whatsapp, wa_raft]}),
NewRetryAfterTs = NowMillis + ?RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS(App),
{noreply, State#state{backoff_windows = BackoffWindows#{Peer => NewRetryAfterTs}}};
{ok, ID} ->
?LOG_NOTICE("started sending snapshot for ~0p:~0p at ~0p:~0p over transport ~0p",
[Table, Partition, Index, Term, ID], #{domain => [whatsapp, wa_raft]}),
NewTransports = Transports#{{Peer, Table, Partition} => #transport{id = ID, snapshot = LogPos}},
NewSnapshots = maps:update_with({Table, Partition, LogPos}, fun(V) -> V + 1 end, 1, Snapshots),
NewBackoffWindows = maps:remove(Peer, BackoffWindows),
{noreply, State#state{transports = NewTransports, snapshots = NewSnapshots, backoff_windows = NewBackoffWindows}}
end
catch
_T:_E:S ->
?LOG_ERROR("failed to start accepted snapshot transport of ~0p:~0p to ~0p at ~p",
Expand Down
Loading

0 comments on commit 8563bf6

Please sign in to comment.