Skip to content

Commit

Permalink
Set leader and current term info together
Browse files Browse the repository at this point in the history
Summary:
Adjust `wa_raft_info` to store the leader and current term together in a
single ETS record so that leader and current term info are always
updated and read atomically to avoid reading a leader for the wrong term
due to data races between updating of the leader and term.

Differential Revision: D54859928

fbshipit-source-id: e6f1371547a6086ec690b0cb02f9d0297f1421d8
  • Loading branch information
hsun324 authored and facebook-github-bot committed Mar 13, 2024
1 parent 2141bf3 commit 4386588
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 24 deletions.
34 changes: 19 additions & 15 deletions src/wa_raft_info.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
-export([
get_current_term/2,
get_leader/2,
get_current_term_and_leader/2,
get_membership/2,
get_stale/2,
get_state/2
Expand All @@ -23,19 +24,16 @@
-export([
init_tables/0,
delete_state/2,
set_current_term/3,
set_leader/3,
set_current_term_and_leader/4,
set_membership/3,
set_stale/3,
set_state/3
]).

%% Local RAFT server's current FSM state
-define(RAFT_SERVER_STATE_KEY(Table, Partition), {state, Table, Partition}).
%% Local RAFT server's most recently known term
-define(RAFT_CURRENT_TERM_KEY(Table, Partition), {term, Table, Partition}).
%% Local RAFT server's most recently known leader node
-define(RAFT_CURRENT_LEADER_KEY(Table, Partition), {leader, Table, Partition}).
%% Local RAFT server's most recently known term and leader
-define(RAFT_CURRENT_TERM_AND_LEADER_KEY(Table, Partition), {term, Table, Partition}).
%% Local RAFT server's current stale flag - indicates if the server thinks its data is stale
-define(RAFT_STALE_KEY(Table, Partition), {stale, Table, Partition}).
%% Local RAFT server's most recently known membership
Expand All @@ -55,11 +53,21 @@ get(Key, Default) ->

-spec get_leader(wa_raft:table(), wa_raft:partition()) -> node() | undefined.
get_leader(Table, Partition) ->
get(?RAFT_CURRENT_LEADER_KEY(Table, Partition), undefined).
{_, Leader} = get(?RAFT_CURRENT_TERM_AND_LEADER_KEY(Table, Partition), {undefined, undefined}),
Leader.

-spec get_current_term(wa_raft:table(), wa_raft:partition()) -> wa_raft_log:log_term() | undefined.
get_current_term(Table, Partition) ->
get(?RAFT_CURRENT_TERM_KEY(Table, Partition), undefined).
{Term, _} = get(?RAFT_CURRENT_TERM_AND_LEADER_KEY(Table, Partition), {undefined, undefined}),
Term.

%% The RAFT server always sets both the known term and leader together, so that
%% the atomic read performed by this method will not return a known leader for
%% a different term.
-spec get_current_term_and_leader(wa_raft:table(), wa_raft:partition()) ->
{wa_raft_log:log_term() | undefined, node() | undefined}.
get_current_term_and_leader(Table, Partition) ->
get(?RAFT_CURRENT_TERM_AND_LEADER_KEY(Table, Partition), {undefined, undefined}).

-spec get_state(wa_raft:table(), wa_raft:partition()) -> wa_raft_server:state() | undefined.
get_state(Table, Partition) ->
Expand Down Expand Up @@ -90,13 +98,9 @@ set(Key, Value) ->
delete(Key) ->
ets:delete(?MODULE, Key).

-spec set_leader(wa_raft:table(), wa_raft:partition(), node()) -> true.
set_leader(Table, Partition, Value) ->
set(?RAFT_CURRENT_LEADER_KEY(Table, Partition), Value).

-spec set_current_term(wa_raft:table(), wa_raft:partition(), wa_raft_log:log_term()) -> true.
set_current_term(Table, Partition, Term) ->
set(?RAFT_CURRENT_TERM_KEY(Table, Partition), Term).
-spec set_current_term_and_leader(wa_raft:table(), wa_raft:partition(), wa_raft_log:log_term(), node()) -> true.
set_current_term_and_leader(Table, Partition, Term, Leader) ->
set(?RAFT_CURRENT_TERM_AND_LEADER_KEY(Table, Partition), {Term, Leader}).

-spec set_state(wa_raft:table(), wa_raft:partition(), wa_raft_server:state()) -> true.
set_state(Table, Partition, State) ->
Expand Down
15 changes: 6 additions & 9 deletions src/wa_raft_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ init(#raft_options{application = Application, table = Table, partition = Partiti
{ok, NewState} -> NewState;
_ -> State1
end,
true = wa_raft_info:set_current_term(Table, Partition, State2#raft_state.current_term),
true = wa_raft_info:set_current_term_and_leader(Table, Partition, State2#raft_state.current_term, undefined),
% 1. Begin as disabled if a disable reason is set
% 2. Begin as witness if configured
% 3. Begin as stalled if there is no data
Expand Down Expand Up @@ -2040,7 +2040,7 @@ set_leader(_StateName, ?IDENTITY_REQUIRES_MIGRATION(_, Node), #raft_state{leader
set_leader(StateName, ?IDENTITY_REQUIRES_MIGRATION(_, Node), #raft_state{name = Name, table = Table, partition = Partition, current_term = CurrentTerm} = State) ->
?LOG_NOTICE("Server[~0p, term ~0p, ~0p] changes leader to ~0p.",
[Name, CurrentTerm, StateName, Node], #{domain => [whatsapp, wa_raft]}),
wa_raft_info:set_leader(Table, Partition, Node),
wa_raft_info:set_current_term_and_leader(Table, Partition, CurrentTerm, Node),
State#raft_state{leader_id = Node}.

-spec clear_leader(state(), #raft_state{}) -> #raft_state{}.
Expand All @@ -2049,7 +2049,7 @@ clear_leader(_StateName, #raft_state{leader_id = undefined} = State) ->
clear_leader(StateName, #raft_state{name = Name, table = Table, partition = Partition, current_term = CurrentTerm} = State) ->
?LOG_NOTICE("Server[~0p, term ~0p, ~0p] clears leader record.",
[Name, CurrentTerm, StateName], #{domain => [whatsapp, wa_raft]}),
wa_raft_info:set_leader(Table, Partition, undefined),
wa_raft_info:set_current_term_and_leader(Table, Partition, CurrentTerm, undefined),
State#raft_state{leader_id = undefined}.

%% Setup the RAFT state upon entry into a new RAFT server state.
Expand Down Expand Up @@ -2095,11 +2095,8 @@ check_stale_upon_entry(_StateName, _Now, #raft_state{table = Table, partition =

%% Set a new current term and voted-for peer and clear any state that is associated with the previous term.
-spec advance_term(StateName :: state(), NewerTerm :: wa_raft_log:log_term(), VotedFor :: undefined | node(), State :: #raft_state{}) -> #raft_state{}.
advance_term(StateName, NewerTerm, VotedFor,
#raft_state{table = Table, partition = Partition,
current_term = CurrentTerm} = State0) when NewerTerm > CurrentTerm ->
State1 = clear_leader(StateName, State0),
State2 = State1#raft_state{
advance_term(StateName, NewerTerm, VotedFor, #raft_state{current_term = CurrentTerm} = State0) when NewerTerm > CurrentTerm ->
State1 = State0#raft_state{
current_term = NewerTerm,
voted_for = VotedFor,
votes = #{},
Expand All @@ -2109,8 +2106,8 @@ advance_term(StateName, NewerTerm, VotedFor,
heartbeat_response_ts = #{},
handover = undefined
},
State2 = clear_leader(StateName, State1),
ok = wa_raft_durable_state:store(State2),
true = wa_raft_info:set_current_term(Table, Partition, NewerTerm),
State2.

%%-------------------------------------------------------------------
Expand Down

0 comments on commit 4386588

Please sign in to comment.