Skip to content

Commit e83b688

Browse files
committed
Fix snapshot live indexes replication bug
1 parent 13e1bf8 commit e83b688

File tree

6 files changed

+116
-21
lines changed

6 files changed

+116
-21
lines changed

src/ra_log.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -706,10 +706,21 @@ last_written(#?MODULE{last_written_index_term = LWTI}) ->
706706
{ok, state()} | {not_found, state()}.
707707
set_last_index(Idx, #?MODULE{cfg = Cfg,
708708
range = Range,
709+
snapshot_state = SnapState,
709710
last_written_index_term = {LWIdx0, _}} = State0) ->
711+
Cur = ra_snapshot:current(SnapState),
710712
case fetch_term(Idx, State0) of
711-
{undefined, State} ->
713+
{undefined, State} when element(1, Cur) =/= Idx ->
714+
%% not found and Idx isn't equal to latest snapshot index
712715
{not_found, State};
716+
{_, State} when element(1, Cur) =:= Idx ->
717+
{_, SnapTerm} = Cur,
718+
%% Idx is equal to the current snapshot
719+
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
720+
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, Idx),
721+
{ok, State#?MODULE{range = ra_range:limit(Idx + 1, Range),
722+
last_term = SnapTerm,
723+
last_written_index_term = Cur}};
713724
{Term, State1} ->
714725
LWIdx = min(Idx, LWIdx0),
715726
{LWTerm, State2} = fetch_term(LWIdx, State1),

src/ra_log_segment.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,7 @@ segref(Filename) ->
456456
SegRef.
457457

458458
-type infos() :: #{size => non_neg_integer(),
459+
index_size => non_neg_integer(),
459460
max_count => non_neg_integer(),
460461
file_type => regular | symlink,
461462
ctime => integer(),

src/ra_mt.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ insert({Idx, _, _} = _Entry,
120120
end.
121121

122122
-spec insert_sparse(log_entry(), undefined | ra:index(), state()) ->
123-
{ok, state()} | {error, overwriting | gap_detected | limit_reached}.
123+
{ok, state()} | {error,
124+
overwriting |
125+
gap_detected |
126+
limit_reached}.
124127
insert_sparse({Idx, _, _} = Entry, LastIdx,
125128
#?MODULE{tid = Tid,
126129
indexes = Seq} = State) ->

src/ra_server.erl

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1427,7 +1427,7 @@ handle_follower(#install_snapshot_rpc{term = Term,
14271427
meta = #{index := SnapIdx,
14281428
machine_version := SnapMacVer} = Meta,
14291429
leader_id = LeaderId,
1430-
chunk_state = {Num, _ChunkFlag}} = Rpc,
1430+
chunk_state = {Num, ChunkFlag}} = Rpc,
14311431
#{cfg := #cfg{log_id = LogId,
14321432
machine_version = MacVer}, log := Log0,
14331433
last_applied := LastApplied,
@@ -1443,7 +1443,17 @@ handle_follower(#install_snapshot_rpc{term = Term,
14431443
[LogId, SnapIdx, Term]),
14441444
SnapState0 = ra_log:snapshot_state(Log0),
14451445
{ok, SS} = ra_snapshot:begin_accept(Meta, SnapState0),
1446-
Log = ra_log:set_snapshot_state(SS, Log0),
1446+
Log1 = ra_log:set_snapshot_state(SS, Log0),
1447+
1448+
%% if the snaphost includes pre entries (live entries) then we need
1449+
%% to reset the log to the last applied index to avoid issues
1450+
Log = case ChunkFlag of
1451+
pre ->
1452+
{ok, L} = ra_log:set_last_index(LastApplied, Log1),
1453+
L;
1454+
_ ->
1455+
Log1
1456+
end,
14471457
{receive_snapshot, update_term(Term, State0#{log => Log,
14481458
leader_id => LeaderId}),
14491459
[{next_event, Rpc}, {record_leader_msg, LeaderId}]};
@@ -1538,17 +1548,18 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
15381548
last_index = SnapIndex},
15391549
case ChunkFlag of
15401550
pre when is_list(ChunkOrEntries) ->
1541-
%% TODO: we may need to reset the log here to
1542-
%% the last applied index as we
1543-
%% dont know for sure indexes after last applied
1551+
%% reset last index to last applied
1552+
%% as we dont know for sure indexes after last applied
15441553
%% are of the right term
15451554
{LastIndex, _} = ra_log:last_index_term(Log00),
1546-
{Log0, _} = lists:foldl(
1555+
{Log, _} = lists:foldl(
15471556
fun ({I, _, _} = E, {L0, LstIdx}) ->
15481557
{ok, L} = ra_log:write_sparse(E, LstIdx, L0),
15491558
{L, I}
15501559
end, {Log00, LastIndex}, ChunkOrEntries),
1551-
State = update_term(Term, State0#{log => Log0}),
1560+
?DEBUG("~ts: receiving snapshot log last index ~p",
1561+
[LogId, ra_log:last_index_term(Log)]),
1562+
State = update_term(Term, State0#{log => Log}),
15521563
{receive_snapshot, State, [{reply, Reply}]};
15531564
next ->
15541565
SnapState0 = ra_log:snapshot_state(Log00),
@@ -1606,6 +1617,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
16061617
membership =>
16071618
get_membership(ClusterIds, State0),
16081619
machine_state => MacState}),
1620+
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapIndex),
16091621
%% it was the last snapshot chunk so we can revert back to
16101622
%% follower status
16111623
{follower, persist_last_applied(State), [{reply, Reply} |

src/ra_server_proc.erl

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,7 +1535,9 @@ handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, _Id, Term}},
15351535
#state{server_state = SS0,
15361536
monitors = Monitors,
15371537
conf = #conf{snapshot_chunk_size = ChunkSize,
1538-
install_snap_rpc_timeout = InstallSnapTimeout} = Conf} = State0,
1538+
log_id = LogId,
1539+
install_snap_rpc_timeout = InstallSnapTimeout} = Conf}
1540+
= State0,
15391541
Actions) ->
15401542
case lists:member(ToNode, [node() | nodes()]) of
15411543
true ->
@@ -1546,7 +1548,7 @@ handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, _Id, Term}},
15461548
Pid = spawn(fun () ->
15471549
try send_snapshots(Id, Term, To,
15481550
ChunkSize, InstallSnapTimeout,
1549-
SnapState, Machine) of
1551+
SnapState, Machine, LogId) of
15501552
_ -> ok
15511553
catch
15521554
C:timeout:S ->
@@ -1906,29 +1908,39 @@ read_entries0(From, Idxs, #state{server_state = #{log := Log}} = State) ->
19061908
{keep_state, State, [{reply, From, {ok, ReadState}}]}.
19071909

19081910
send_snapshots(Id, Term, {_, ToNode} = To, ChunkSize,
1909-
InstallTimeout, SnapState, Machine) ->
1911+
InstallTimeout, SnapState, Machine, LogId) ->
19101912
Context = ra_snapshot:context(SnapState, ToNode),
19111913
{ok, #{machine_version := SnapMacVer} = Meta, ReadState} =
19121914
ra_snapshot:begin_read(SnapState, Context),
19131915

1914-
%% only send the snapshot if the target server can accept it
1915-
%% TODO: grab the last_applied index also and use this to floor
1916-
%% the live indexes
1916+
%% TODO: consolidate getting the context, machinve version and last
1917+
%% applied index in one rpc, and handle errors
19171918
TheirMacVer = erpc:call(ToNode, ra_machine, version, [Machine]),
19181919

1919-
%% rpc the check what their
1920+
%% only send the snapshot if the target server can accept it
19201921
case SnapMacVer > TheirMacVer of
19211922
true ->
1923+
?DEBUG("~ts: not sending snapshot to ~w as their machine version ~b "
1924+
"is lower than snapshot machine version ~b",
1925+
[LogId, To, TheirMacVer, SnapMacVer]),
19221926
ok;
19231927
false ->
1928+
#{last_applied := LastApplied} = erpc:call(ToNode,
1929+
ra_counters,
1930+
counters,
1931+
[To, [last_applied]]),
19241932
RPC = #install_snapshot_rpc{term = Term,
19251933
leader_id = Id,
19261934
meta = Meta},
1927-
case ra_snapshot:indexes(ra_snapshot:current_snapshot_dir(SnapState)) of
1928-
{ok, [_|_] = Indexes} ->
1935+
case ra_snapshot:indexes(
1936+
ra_snapshot:current_snapshot_dir(SnapState)) of
1937+
{ok, [_|_] = Indexes0} ->
1938+
%% remove all indexes lower than the target's last applied
1939+
Indexes = ra_seq:floor(LastApplied + 1, Indexes0),
1940+
?DEBUG("~ts: sending live indexes ~w to ~w ",
1941+
[LogId, ra_seq:range(Indexes), To]),
19291942
%% there are live indexes to send before the snapshot
1930-
%% %% TODO: only send live indexes higher than the follower's
1931-
%% last_applied index
1943+
%% TODO: write ra_seq:list_chunk function to avoid expansion
19321944
Idxs = lists:reverse(ra_seq:expand(Indexes)),
19331945
Flru = lists:foldl(
19341946
fun (Is, F0) ->

test/ra_kv_SUITE.erl

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ all() ->
2020

2121
all_tests() ->
2222
[
23-
basics
23+
basics,
24+
snapshot_replication
2425
].
2526

2627
groups() ->
@@ -56,6 +57,60 @@ end_per_testcase(_TestCase, _Config) ->
5657
%%%===================================================================
5758

5859

60+
snapshot_replication(_Config) ->
61+
Members = [{kv1, node()}, {kv2, node()}],
62+
KvId = hd(Members),
63+
64+
{ok, _, _} = ra_kv:start_cluster(?SYS, ?FUNCTION_NAME,
65+
#{members => Members}),
66+
ra:transfer_leadership(KvId, KvId),
67+
{ok, #{}} = ra_kv:put(KvId, <<"k1">>, <<"k1-value01">>, 5000),
68+
%% write 10k entries of the same key
69+
[{ok, #{}} = ra_kv:put(KvId, integer_to_binary(I), I, 5000)
70+
|| I <- lists:seq(1, 5000)],
71+
72+
ct:pal("kv get ~p", [ra_kv:get(KvId, <<"k1">>, 5000)]),
73+
ct:pal("leaderboard ~p", [ets:tab2list(ra_leaderboard)]),
74+
75+
?assertMatch({ok, #{machine := #{num_keys := _}}, KvId},
76+
ra:member_overview(KvId)),
77+
ra_log_wal:force_roll_over(ra_log_wal),
78+
%% wait for rollover processing
79+
ra_log_wal:last_writer_seq(ra_log_wal, <<>>),
80+
%% wait for segment writer to process
81+
ra_log_segment_writer:await(ra_log_segment_writer),
82+
%% promt ra_kv to take a snapshot
83+
ok = ra:aux_command(KvId, take_snapshot),
84+
ok = ra_lib:retry(
85+
fun () ->
86+
{ok, #{log := #{snapshot_index := SnapIdx,
87+
last_index := LastIdx}}, _} =
88+
ra:member_overview(KvId),
89+
SnapIdx == LastIdx
90+
end, 100, 100),
91+
92+
KvId3 = {kv3, node()},
93+
ok = ra_kv:add_member(?SYS, KvId3, KvId),
94+
KvId3Pid = whereis(kv3),
95+
?assert(is_pid(KvId3Pid)),
96+
{ok, #{}} = ra_kv:put(KvId, <<"k3">>, <<"k3-value01">>, 5000),
97+
{ok, #{}} = ra_kv:put(KvId, <<"k4">>, <<"k3-value01">>, 5000),
98+
ok = ra:aux_command(KvId, take_snapshot),
99+
% timer:sleep(1000),
100+
{ok, #{log := #{last_index := Kv1LastIndex }}, _} =
101+
ra:member_overview(KvId),
102+
ok = ra_lib:retry(
103+
fun () ->
104+
{ok, #{log := #{last_index := LastIdx}}, _} =
105+
ra:member_overview(KvId3),
106+
Kv1LastIndex == LastIdx
107+
end, 100, 100),
108+
ct:pal("counters ~p", [ra_counters:counters(KvId3, [last_applied])]),
109+
%% ensure kv3 did not crash during snapshot replication
110+
?assertEqual(KvId3Pid, whereis(kv3)),
111+
ra:delete_cluster([KvId, {kv2, node()}, KvId3]),
112+
ok.
113+
59114
basics(_Config) ->
60115
Members = [{kv1, node()}],
61116
KvId = hd(Members),
@@ -136,4 +191,5 @@ basics(_Config) ->
136191
undefined, 1000),
137192
?assertEqual(Reads4, Reads5),
138193
ct:pal("counters ~p", [ra_counters:overview(KvId)]),
194+
ra:delete_cluster([KvId, KvId2]),
139195
ok.

0 commit comments

Comments
 (0)