Skip to content

Commit

Permalink
RATIS-1390. Bootstrapping Peer should always try to install a snapsho…
Browse files Browse the repository at this point in the history
…t the first time.
  • Loading branch information
hanishakoneru committed Aug 9, 2021
1 parent a731551 commit d67c91d
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ public void onNext(InstallSnapshotReplyProto reply) {
switch (reply.getResult()) {
case SUCCESS:
LOG.info("{}: Completed InstallSnapshot. Reply: {}", this, reply);
getFollower().ackInstallSnapshotAttempt();
removePending(reply);
break;
case IN_PROGRESS:
Expand All @@ -421,6 +422,7 @@ public void onNext(InstallSnapshotReplyProto reply) {
final long followerSnapshotIndex = reply.getSnapshotIndex();
LOG.info("{}: Already Installed Snapshot Index {}.", this, followerSnapshotIndex);
getFollower().setSnapshotIndex(followerSnapshotIndex);
getFollower().ackInstallSnapshotAttempt();
getLeaderState().onFollowerCommitIndex(getFollower(), followerSnapshotIndex);
increaseNextIndex(followerSnapshotIndex);
removePending(reply);
Expand All @@ -433,6 +435,10 @@ public void onNext(InstallSnapshotReplyProto reply) {
this, RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
getServer().getId(), installSnapshotEnabled, getFollowerId(), !installSnapshotEnabled);
break;
case NULL_SNAPSHOT:
LOG.info("{}: StateMachine could not install snapshot as it is not available.", this);
getFollower().ackInstallSnapshotAttempt();
break;
case UNRECOGNIZED:
LOG.error("Unrecongnized the reply result {}: Leader is {}, follower is {}",
reply.getResult(), getServer().getId(), getFollowerId());
Expand Down Expand Up @@ -562,9 +568,21 @@ private void installSnapshot(TermIndex firstAvailableLogTermIndex) {
* @return the first available log's start term index
*/
private TermIndex shouldNotifyToInstallSnapshot() {
final long followerNextIndex = getFollower().getNextIndex();
final FollowerInfo follower = getFollower();
final long leaderNextIndex = getRaftLog().getNextIndex();
final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower);

if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
// If the follower is bootstrapping and has not yet installed any snapshot from leader, then the follower should
// be notified to install a snapshot. Every follower should try to install at least one snapshot during
// bootstrapping, if available.
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Notify follower to install snapshot as it is bootstrapping.", this);
}
return getRaftLog().getLastEntryTermIndex();
}

final long followerNextIndex = follower.getNextIndex();
if (followerNextIndex >= leaderNextIndex) {
return null;
}
Expand Down
1 change: 1 addition & 0 deletions ratis-proto/src/main/proto/Raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ enum InstallSnapshotResult {
IN_PROGRESS = 2;
ALREADY_INSTALLED = 3;
CONF_MISMATCH = 4;
NULL_SNAPSHOT = 5;
}

message RequestVoteRequestProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ public interface FollowerInfo {
/** Set follower's snapshotIndex. */
void setSnapshotIndex(long newSnapshotIndex);

/** Acknowledge that Follower attempted to install a snapshot. It does not guarantee that the installation was
* successful. This helps to determine whether Follower can come out of bootstrap process. */
void ackInstallSnapshotAttempt();

/** Return true if install snapshot has been attempted by the Follower at least once. Used to verify if
* Follower tried to install snapshot during bootstrap process. */
boolean hasAttemptedToInstallSnapshot();

/** @return the nextIndex for this follower. */
long getNextIndex();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,

/** Handle the event that the follower has replied a success append entries. */
void onFollowerSuccessAppendEntries(FollowerInfo follower);

/** Check if a follower is bootstrapping. */
boolean isFollowerBootstrapping(FollowerInfo follower);

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,24 @@ default SnapshotInfo shouldInstallSnapshot() {
// we should install snapshot if the follower needs to catch up and:
// 1. there is no local log entry but there is snapshot
// 2. or the follower's next index is smaller than the log start index
// 3. or the follower is bootstrapping and has not installed any snapshot yet
final FollowerInfo follower = getFollower();
final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower);
final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();

if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
if (snapshot == null) {
// Leader cannot send null snapshot to follower. Hence, acknowledge InstallSnapshot attempt (even though it
// was not attempted) so that follower can come out of staging state after appending log entries.
follower.ackInstallSnapshotAttempt();
} else {
return snapshot;
}
}

final long followerNextIndex = getFollower().getNextIndex();
if (followerNextIndex < getRaftLog().getNextIndex()) {
final long logStartIndex = getRaftLog().getStartIndex();
final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();
if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
return snapshot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class FollowerInfoImpl implements FollowerInfo {
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
private volatile boolean attendVote;
private volatile boolean ackInstallSnapshotAttempt = false;

FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime, long nextIndex, boolean attendVote) {
this.name = id + "->" + peer.getId();
Expand Down Expand Up @@ -110,6 +111,17 @@ public void setSnapshotIndex(long newSnapshotIndex) {
nextIndex.setUnconditionally(newSnapshotIndex + 1, infoIndexChange);
}

@Override
public void ackInstallSnapshotAttempt() {
LOG.info("----- Follower ack Install Snapshot old snapshotIndex: " + snapshotIndex.get());
ackInstallSnapshotAttempt = true;
}

@Override
public boolean hasAttemptedToInstallSnapshot() {
return ackInstallSnapshotAttempt;
}

@Override
public String getName() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,8 @@ private BootStrapProgress checkProgress(FollowerInfo follower, long committed) {
LOG.debug("{} detects a follower {} timeout ({}) for bootstrapping", this, follower, timeoutTime);
return BootStrapProgress.NOPROGRESS;
} else if (follower.getMatchIndex() + stagingCatchupGap > committed
&& follower.getLastRpcResponseTime().compareTo(progressTime) > 0) {
&& follower.getLastRpcResponseTime().compareTo(progressTime) > 0
&& follower.hasAttemptedToInstallSnapshot()) {
return BootStrapProgress.CAUGHTUP;
} else {
return BootStrapProgress.PROGRESSING;
Expand All @@ -643,6 +644,11 @@ public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
}
}

@Override
public boolean isFollowerBootstrapping(FollowerInfo follower) {
return isBootStrappingPeer(follower.getPeer().getId());
}

private void checkStaging() {
if (!inStagingState()) {
// it is possible that the bootstrapping is done. Then, fallback to UPDATE_COMMIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public long[] getFollowerNextIndices() {
private final RaftServerMetricsImpl raftServerMetrics;

private final AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
private final AtomicBoolean isSnapshotNull;

// To avoid append entry before complete start() method
// For example, if thread1 start(), but before thread1 startAsFollower(), thread2 receive append entry
Expand Down Expand Up @@ -194,6 +195,7 @@ public long[] getFollowerNextIndices() {
this.state = new ServerState(id, group, properties, this, stateMachine);
this.retryCache = new RetryCacheImpl(properties);
this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
this.isSnapshotNull = new AtomicBoolean(false);
this.dataStreamMap = new DataStreamMapImpl(id);

this.jmxAdapter = new RaftServerJmxAdapter();
Expand Down Expand Up @@ -1530,7 +1532,6 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();

synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
Expand All @@ -1544,13 +1545,11 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
state.setLeader(leaderId, "installSnapshot");

updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);

if (inProgressInstallSnapshotRequest.compareAndSet(null, firstAvailableLogTermIndex)) {

// Check if snapshot index is already at par or ahead of the first
// available log index of the Leader.
long snapshotIndex = state.getSnapshotIndex();
if (snapshotIndex + 1 >= firstAvailableLogIndex) {
if (snapshotIndex + 1 >= firstAvailableLogIndex && firstAvailableLogIndex > 0) {
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.

Expand Down Expand Up @@ -1596,6 +1595,11 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
stateMachine.pause();
state.updateInstalledSnapshotIndex(reply);
state.reloadStateMachine(reply.getIndex());
} else {
isSnapshotNull.set(true);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine could not install snapshot as it is not available", this);
}
}
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
});
Expand All @@ -1611,6 +1615,11 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
LOG.info("{}: Snapshot Installation by StateMachine is in progress.", getMemberId());
}

if (isSnapshotNull.compareAndSet(true, false)) {
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.NULL_SNAPSHOT, -1);
}

return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex

@Override
public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() >= 0);
synchronized (server) {
return LeaderProtoUtils.toInstallSnapshotRequestProto(server, getFollowerId(), firstAvailableLogTermIndex);
}
Expand Down

0 comments on commit d67c91d

Please sign in to comment.