diff --git a/include/wa_raft.hrl b/include/wa_raft.hrl index 589fd8e..a034007 100644 --- a/include/wa_raft.hrl +++ b/include/wa_raft.hrl @@ -240,6 +240,12 @@ % Time to wait before retrying snapshot transport to a overloaded peer. -define(RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS, snapshot_catchup_overloaded_backoff_ms). -define(RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS(App), ?RAFT_APP_CONFIG(App, ?RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS, 1000)). +% Time to wait before allowing a rerun of a completed snapshot transport. +-define(RAFT_SNAPSHOT_CATCHUP_COMPLETED_BACKOFF_MS, raft_snapshot_catchup_completed_backoff_ms). +-define(RAFT_SNAPSHOT_CATCHUP_COMPLETED_BACKOFF_MS(App), ?RAFT_APP_CONFIG(App, ?RAFT_SNAPSHOT_CATCHUP_COMPLETED_BACKOFF_MS, 20 * 1000)). +% Time to wait before allowing a rerun of a failed snapshot transport. +-define(RAFT_SNAPSHOT_CATCHUP_FAILED_BACKOFF_MS, raft_snapshot_catchup_failed_backoff_ms). +-define(RAFT_SNAPSHOT_CATCHUP_FAILED_BACKOFF_MS(App), ?RAFT_APP_CONFIG(App, ?RAFT_SNAPSHOT_CATCHUP_FAILED_BACKOFF_MS, 10 * 1000)). %% Time in seconds to retain transport destination directories after use -define(RAFT_TRANSPORT_RETAIN_INTERVAL, transport_retain_min_secs). diff --git a/src/wa_raft_snapshot_catchup.erl b/src/wa_raft_snapshot_catchup.erl index b9c3ff1..d41afcf 100644 --- a/src/wa_raft_snapshot_catchup.erl +++ b/src/wa_raft_snapshot_catchup.erl @@ -40,6 +40,7 @@ -type snapshot_key() :: {wa_raft:table(), wa_raft:partition(), wa_raft_log:log_pos()}. -record(transport, { + app :: atom(), id :: wa_raft_transport:transport_id(), snapshot :: wa_raft_log:log_pos() }). @@ -48,8 +49,10 @@ transports = #{} :: #{key() => #transport{}}, % counts of active transports that are using a particular snapshot snapshots = #{} :: #{snapshot_key() => pos_integer()}, - % backoff windows for nodes that previously reported being overloaded - backoff_windows = #{} :: #{node() => pos_integer()} + % timestamps (ms) after which transports to previously overloaded nodes can be retried + overload_backoffs = #{} :: #{node() => integer()}, + % timestamps (ms) after which repeat transports can be retried + retry_backoffs = #{} :: #{key() => integer()} }). -spec child_spec() -> supervisor:child_spec(). @@ -88,14 +91,16 @@ handle_call(Request, From, #state{} = State) -> {noreply, State}. -spec handle_cast({request_snapshot_transport, atom(), node(), wa_raft:table(), wa_raft:partition()}, State :: #state{}) -> {noreply, #state{}}. -handle_cast({request_snapshot_transport, App, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots, backoff_windows = BackoffWindows} = State) -> - NowMillis = erlang:monotonic_time(millisecond), - case {Transports, BackoffWindows} of - {#{{Peer, Table, Partition} := _}, _} -> - {noreply, State}; - {_, #{Peer := RetryAfterTs}} when RetryAfterTs > NowMillis -> +handle_cast({request_snapshot_transport, App, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots, overload_backoffs = OverloadBackoffs, retry_backoffs = RetryBackoffs} = State) -> + Now = erlang:monotonic_time(millisecond), + Key = {Peer, Table, Partition}, + Exists = maps:is_key(Key, Transports), + Overloaded = maps:get(Peer, OverloadBackoffs, Now) > Now, + Blocked = maps:get(Key, RetryBackoffs, Now) > Now, + case Exists orelse Overloaded orelse Blocked of + true -> {noreply, State}; - {_, _} -> + false -> case maps:size(Transports) < ?RAFT_MAX_CONCURRENT_SNAPSHOT_CATCHUP() of true -> try @@ -106,15 +111,28 @@ handle_cast({request_snapshot_transport, App, Peer, Table, Partition}, #state{tr {error, receiver_overloaded} -> ?LOG_NOTICE("Peer ~0p reported being overloaded. Not sending snapshot for ~0p:~0p. Will try again later", [Peer, Table, Partition], #{domain => [whatsapp, wa_raft]}), - NewRetryAfterTs = NowMillis + ?RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS(App), - {noreply, State#state{backoff_windows = BackoffWindows#{Peer => NewRetryAfterTs}}}; + NewOverloadBackoff = Now + ?RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS(App), + NewOverloadBackoffs = OverloadBackoffs#{Peer => NewOverloadBackoff}, + NewRetryBackoffs = maps:remove(Key, RetryBackoffs), + NewState = State#state{ + overload_backoffs = NewOverloadBackoffs, + retry_backoffs = NewRetryBackoffs + }, + {noreply, NewState}; {ok, ID} -> ?LOG_NOTICE("started sending snapshot for ~0p:~0p at ~0p:~0p over transport ~0p", [Table, Partition, Index, Term, ID], #{domain => [whatsapp, wa_raft]}), - NewTransports = Transports#{{Peer, Table, Partition} => #transport{id = ID, snapshot = LogPos}}, + NewTransports = Transports#{{Peer, Table, Partition} => #transport{app = App, id = ID, snapshot = LogPos}}, NewSnapshots = maps:update_with({Table, Partition, LogPos}, fun(V) -> V + 1 end, 1, Snapshots), - NewBackoffWindows = maps:remove(Peer, BackoffWindows), - {noreply, State#state{transports = NewTransports, snapshots = NewSnapshots, backoff_windows = NewBackoffWindows}} + NewOverloadBackoffs = maps:remove(Peer, OverloadBackoffs), + NewRetryBackoffs = maps:remove(Key, RetryBackoffs), + NewState = State#state{ + transports = NewTransports, + snapshots = NewSnapshots, + overload_backoffs = NewOverloadBackoffs, + retry_backoffs = NewRetryBackoffs + }, + {noreply, NewState} end catch _T:_E:S -> @@ -151,32 +169,41 @@ terminate(_Reason, #state{transports = Transports, snapshots = Snapshots}) -> end, Snapshots). -spec scan_transport(Key :: key(), Transport :: #transport{}, #state{}) -> #state{}. -scan_transport({_Peer, Table, Partition} = Key, #transport{id = ID, snapshot = LogPos}, - #state{transports = Transports, snapshots = Snapshots} = State) -> +scan_transport(Key, #transport{app = App, id = ID} = Transport, State) -> Status = case wa_raft_transport:transport_info(ID) of {ok, #{status := S}} -> S; _ -> undefined end, - case Status =:= requested orelse Status =:= running of - true -> + case Status of + requested -> State; - false -> - SnapshotKey = {Table, Partition, LogPos}, - NewSnapshots = case Snapshots of - #{SnapshotKey := 1} -> - % try to delete a snapshot if it is the last transport using it - delete_snapshot(Table, Partition, LogPos), - maps:remove(SnapshotKey, Snapshots); - #{SnapshotKey := Count} -> - % otherwise decrement the reference count for the snapshot - Snapshots#{SnapshotKey => Count - 1}; - #{} -> - % unexpected that the snapshot is missing, but just ignore - Snapshots - end, - State#state{transports = maps:remove(Key, Transports), snapshots = NewSnapshots} + running -> + State; + completed -> + finish_transport(Key, Transport, ?RAFT_SNAPSHOT_CATCHUP_COMPLETED_BACKOFF_MS(App), State); + _Other -> + finish_transport(Key, Transport, ?RAFT_SNAPSHOT_CATCHUP_FAILED_BACKOFF_MS(App), State) end. +-spec finish_transport(key(), #transport{}, pos_integer(), #state{}) -> #state{}. +finish_transport({_Peer, Table, Partition} = Key, #transport{snapshot = LogPos}, Backoff, #state{transports = Transports, snapshots = Snapshots, retry_backoffs = RetryBackoffs} = State) -> + Now = erlang:monotonic_time(millisecond), + SnapshotKey = {Table, Partition, LogPos}, + NewSnapshots = case Snapshots of + #{SnapshotKey := 1} -> + % try to delete a snapshot if it is the last transport using it + delete_snapshot(Table, Partition, LogPos), + maps:remove(SnapshotKey, Snapshots); + #{SnapshotKey := Count} -> + % otherwise decrement the reference count for the snapshot + Snapshots#{SnapshotKey => Count - 1}; + #{} -> + % unexpected that the snapshot is missing, but just ignore + Snapshots + end, + NewRetryBackoffs = RetryBackoffs#{Key => Now + Backoff}, + State#state{transports = maps:remove(Key, Transports), snapshots = NewSnapshots, retry_backoffs = NewRetryBackoffs}. + -spec delete_snapshot(Table :: wa_raft:table(), Partition :: wa_raft:partition(), Position :: wa_raft_log:log_pos()) -> ok. delete_snapshot(Table, Partition, #raft_log_pos{index = Index, term = Term}) ->