Skip to content

Commit

Permalink
Merge pull request #152 from uwiger/uw-leader-reinitiate-sync
Browse files Browse the repository at this point in the history
also reinitiate sync when elected/re-elected
  • Loading branch information
uwiger authored Dec 12, 2017
2 parents c60e6e3 + 006e37d commit 5c6bc10
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
31 changes: 17 additions & 14 deletions src/gproc_dist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ reset_counter(_) ->
%%
sync() ->
%% Increase timeout since gen_leader can take some time ...
gen_server:call(?MODULE, sync, 5000).
gen_server:call(?MODULE, sync, 10000).

%% @spec get_leader() -> node()
%% @doc Returns the node of the current gproc leader.
Expand Down Expand Up @@ -307,19 +307,20 @@ handle_info(Msg, S, _E) ->
elected(S, _E) ->
{ok, {globals,globs()}, S#state{is_leader = true}}.

elected(S, _E, undefined) ->
elected(S, E, undefined) ->
%% I have become leader; full synch
{ok, {globals, globs()}, S#state{is_leader = true}};
elected(S, _E, _Node) ->
{ok, {globals, globs()},
maybe_reinitiate_sync(S#state{is_leader = true}, E)};
elected(S, E, _Node) ->
Synch = {globals, globs()},
if not S#state.always_broadcast ->
%% Another node recognized us as the leader.
%% Don't broadcast all data to everyone else
{reply, Synch, S};
{reply, Synch, maybe_reinitiate_sync(S, E)};
true ->
%% Main reason for doing this is if we are using a gen_leader
%% that doesn't support the 'reply' return value
{ok, Synch, S}
{ok, Synch, maybe_reinitiate_sync(S, E)}
end.

globs() ->
Expand All @@ -328,15 +329,15 @@ globs() ->
_ = [gproc_lib:ensure_monitor(Pid, g) || {_, Pid, _} <- Gs],
Gs ++ As.

surrendered(#state{is_leader = true} = S, {globals, Globs}, _E) ->
surrendered(#state{is_leader = true} = S, {globals, Globs}, E) ->
%% Leader conflict!
surrendered_1(Globs),
{ok, maybe_reinitiate_sync(S#state{is_leader = false})};
surrendered(S, {globals, Globs}, _E) ->
{ok, maybe_reinitiate_sync(S#state{is_leader = false}, E)};
surrendered(S, {globals, Globs}, E) ->
%% globals from this node should be more correct in our table than
%% in the leader's
surrendered_1(Globs),
{ok, maybe_reinitiate_sync(S#state{is_leader = false})}.
{ok, maybe_reinitiate_sync(S#state{is_leader = false}, E)}.


handle_DOWN(Node, S, E) ->
Expand Down Expand Up @@ -1164,11 +1165,13 @@ initiate_sync(From, S, _E) ->
leader_cast({initiate_sync, From}),
S.

maybe_reinitiate_sync(#state{sync_clients = []} = S) ->
maybe_reinitiate_sync(#state{sync_clients = []} = S, _E) ->
S;
maybe_reinitiate_sync(#state{sync_clients = Cs} = S) ->
_ = [leader_cast({initiate_sync, From}) || From <- Cs],
S.
maybe_reinitiate_sync(#state{sync_clients = Cs} = S, E) ->
lists:foldl(
fun(From, Sx) ->
initiate_sync(From, Sx, E)
end, S, Cs).

send_sync_complete({From, _} = Ref, S, _E) when node(From) == node() ->
reply_to_sync_client(Ref, S);
Expand Down
4 changes: 2 additions & 2 deletions test/gproc_dist_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ dist_test_() ->
tests(Ns, [?f(t_fail_node(Ns))])
end,
fun(Ns) ->
tests(Ns, [{timeout, 10, ?f(t_master_dies(Ns))}])
tests(Ns, [{timeout, 15, ?f(t_master_dies(Ns))}])
end
]}
]}.
Expand Down Expand Up @@ -568,7 +568,7 @@ t_sync_cand_dies([A,B,C]) ->
?assertMatch(ok, rpc:call(Other, sys, suspend, [gproc_dist])),
P = rpc:call(Other, erlang, whereis, [gproc_dist]),
Key = rpc:async_call(Leader, gproc_dist, sync, []),
%% The overall timeout for gproc_dist:sync() is 5 seconds. Here, we should
%% The overall timeout for gproc_dist:sync() is 10 seconds. Here, we should
%% still be waiting.
?assertMatch(timeout, rpc:nb_yield(Key, 1000)),
exit(P, kill),
Expand Down

0 comments on commit 5c6bc10

Please sign in to comment.