Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-1390. Bootstrapping Peer should always try to install a snapshot the first time. #489

Merged
merged 4 commits into from
Aug 22, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,20 +407,22 @@ public void onNext(InstallSnapshotReplyProto reply) {
if (!firstResponseReceived) {
firstResponseReceived = true;
}

final long followerSnapshotIndex;
switch (reply.getResult()) {
case SUCCESS:
LOG.info("{}: Completed InstallSnapshot. Reply: {}", this, reply);
getFollower().setAttemptedToInstallSnapshot();
removePending(reply);
break;
case IN_PROGRESS:
LOG.info("{}: InstallSnapshot in progress.", this);
removePending(reply);
break;
case ALREADY_INSTALLED:
final long followerSnapshotIndex = reply.getSnapshotIndex();
LOG.info("{}: Already Installed Snapshot Index {}.", this, followerSnapshotIndex);
followerSnapshotIndex = reply.getSnapshotIndex();
LOG.info("{}: Follower snapshot is already at index {}.", this, followerSnapshotIndex);
getFollower().setSnapshotIndex(followerSnapshotIndex);
getFollower().setAttemptedToInstallSnapshot();
getLeaderState().onFollowerCommitIndex(getFollower(), followerSnapshotIndex);
increaseNextIndex(followerSnapshotIndex);
removePending(reply);
Expand All @@ -433,6 +435,20 @@ public void onNext(InstallSnapshotReplyProto reply) {
this, RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
getServer().getId(), installSnapshotEnabled, getFollowerId(), !installSnapshotEnabled);
break;
case SNAPSHOT_INSTALLED:
followerSnapshotIndex = reply.getSnapshotIndex();
LOG.info("{}: Follower installed snapshot at index {}", this, followerSnapshotIndex);
getFollower().setSnapshotIndex(followerSnapshotIndex);
getFollower().setAttemptedToInstallSnapshot();
getLeaderState().onFollowerCommitIndex(getFollower(), followerSnapshotIndex);
increaseNextIndex(followerSnapshotIndex);
removePending(reply);
break;
case SNAPSHOT_UNAVAILABLE:
LOG.info("{}: Follower could not install snapshot as it is not available.", this);
getFollower().setAttemptedToInstallSnapshot();
removePending(reply);
break;
case UNRECOGNIZED:
LOG.error("Unrecongnized the reply result {}: Leader is {}, follower is {}",
reply.getResult(), getServer().getId(), getFollowerId());
Expand Down Expand Up @@ -562,9 +578,21 @@ private void installSnapshot(TermIndex firstAvailableLogTermIndex) {
* @return the first available log's start term index
*/
private TermIndex shouldNotifyToInstallSnapshot() {
final long followerNextIndex = getFollower().getNextIndex();
final FollowerInfo follower = getFollower();
final long leaderNextIndex = getRaftLog().getNextIndex();
final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower);

if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
// If the follower is bootstrapping and has not yet installed any snapshot from leader, then the follower should
// be notified to install a snapshot. Every follower should try to install at least one snapshot during
// bootstrapping, if available.
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Notify follower to install snapshot as it is bootstrapping.", this);
}
return getRaftLog().getLastEntryTermIndex();
}

final long followerNextIndex = follower.getNextIndex();
if (followerNextIndex >= leaderNextIndex) {
return null;
}
Expand Down
2 changes: 2 additions & 0 deletions ratis-proto/src/main/proto/Raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ enum InstallSnapshotResult {
IN_PROGRESS = 2;
ALREADY_INSTALLED = 3;
CONF_MISMATCH = 4;
SNAPSHOT_INSTALLED = 5;
SNAPSHOT_UNAVAILABLE = 6;
}

message RequestVoteRequestProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ public interface FollowerInfo {
/** Set follower's snapshotIndex. */
void setSnapshotIndex(long newSnapshotIndex);

/** Acknowledge that Follower attempted to install a snapshot. It does not guarantee that the installation was
* successful. This helps to determine whether Follower can come out of bootstrap process. */
void setAttemptedToInstallSnapshot();

/** Return true if install snapshot has been attempted by the Follower at least once. Used to verify if
* Follower tried to install snapshot during bootstrap process. */
boolean hasAttemptedToInstallSnapshot();

/** @return the nextIndex for this follower. */
long getNextIndex();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,

/** Handle the event that the follower has replied a success append entries. */
void onFollowerSuccessAppendEntries(FollowerInfo follower);

/** Check if a follower is bootstrapping. */
boolean isFollowerBootstrapping(FollowerInfo follower);

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,24 @@ default SnapshotInfo shouldInstallSnapshot() {
// we should install snapshot if the follower needs to catch up and:
// 1. there is no local log entry but there is snapshot
// 2. or the follower's next index is smaller than the log start index
// 3. or the follower is bootstrapping and has not installed any snapshot yet
final FollowerInfo follower = getFollower();
final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower);
final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();

if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
if (snapshot == null) {
// Leader cannot send null snapshot to follower. Hence, acknowledge InstallSnapshot attempt (even though it
// was not attempted) so that follower can come out of staging state after appending log entries.
follower.setAttemptedToInstallSnapshot();
} else {
return snapshot;
}
}

final long followerNextIndex = getFollower().getNextIndex();
if (followerNextIndex < getRaftLog().getNextIndex()) {
final long logStartIndex = getRaftLog().getStartIndex();
final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();
if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
return snapshot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class FollowerInfoImpl implements FollowerInfo {
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
private volatile boolean attendVote;
private volatile boolean ackInstallSnapshotAttempt = false;

FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime, long nextIndex, boolean attendVote) {
this.name = id + "->" + peer.getId();
Expand Down Expand Up @@ -110,6 +111,17 @@ public void setSnapshotIndex(long newSnapshotIndex) {
nextIndex.setUnconditionally(newSnapshotIndex + 1, infoIndexChange);
}

@Override
public void setAttemptedToInstallSnapshot() {
LOG.info("Follower {} acknowledged installing snapshot", name);
ackInstallSnapshotAttempt = true;
}

@Override
public boolean hasAttemptedToInstallSnapshot() {
return ackInstallSnapshotAttempt;
}

@Override
public String getName() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,8 @@ private BootStrapProgress checkProgress(FollowerInfo follower, long committed) {
LOG.debug("{} detects a follower {} timeout ({}) for bootstrapping", this, follower, timeoutTime);
return BootStrapProgress.NOPROGRESS;
} else if (follower.getMatchIndex() + stagingCatchupGap > committed
&& follower.getLastRpcResponseTime().compareTo(progressTime) > 0) {
&& follower.getLastRpcResponseTime().compareTo(progressTime) > 0
&& follower.hasAttemptedToInstallSnapshot()) {
return BootStrapProgress.CAUGHTUP;
} else {
return BootStrapProgress.PROGRESSING;
Expand All @@ -643,6 +644,11 @@ public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
}
}

@Override
public boolean isFollowerBootstrapping(FollowerInfo follower) {
return isBootStrappingPeer(follower.getPeer().getId());
}

private void checkStaging() {
if (!inStagingState()) {
// it is possible that the bootstrapping is done. Then, fallback to UPDATE_COMMIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.impl;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
Expand Down Expand Up @@ -166,6 +167,8 @@ public long[] getFollowerNextIndices() {
private final RaftServerMetricsImpl raftServerMetrics;

private final AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
private final AtomicLong installedSnapshotIndex;
private final AtomicBoolean isSnapshotNull;

// To avoid append entry before complete start() method
// For example, if thread1 start(), but before thread1 startAsFollower(), thread2 receive append entry
Expand Down Expand Up @@ -194,6 +197,8 @@ public long[] getFollowerNextIndices() {
this.state = new ServerState(id, group, properties, this, stateMachine);
this.retryCache = new RetryCacheImpl(properties);
this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
this.installedSnapshotIndex = new AtomicLong();
this.isSnapshotNull = new AtomicBoolean(false);
this.dataStreamMap = new DataStreamMapImpl(id);

this.jmxAdapter = new RaftServerJmxAdapter();
Expand Down Expand Up @@ -1530,7 +1535,6 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();

synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
Expand All @@ -1542,23 +1546,22 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
}
changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
long snapshotIndex = state.getSnapshotIndex();

updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);

if (inProgressInstallSnapshotRequest.compareAndSet(null, firstAvailableLogTermIndex)) {

LOG.info("{}: Received notification to install snapshot at index {}", getMemberId(), firstAvailableLogIndex);
// Check if snapshot index is already at par or ahead of the first
// available log index of the Leader.
long snapshotIndex = state.getSnapshotIndex();
if (snapshotIndex + 1 >= firstAvailableLogIndex) {
if (snapshotIndex + 1 >= firstAvailableLogIndex && firstAvailableLogIndex > 0) {
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.

inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(
leaderId, getMemberId(), currentTerm, InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
LOG.info("{}: StateMachine snapshotIndex is {}", getMemberId(), snapshotIndex);
return reply;
LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
}

Optional<RaftPeerProto> leaderPeerInfo = null;
Expand Down Expand Up @@ -1596,21 +1599,53 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
stateMachine.pause();
state.updateInstalledSnapshotIndex(reply);
state.reloadStateMachine(reply.getIndex());
installedSnapshotIndex.set(reply.getIndex());
} else {
isSnapshotNull.set(true);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine could not install snapshot as it is not available", this);
}
}
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
});
} catch (Throwable t) {
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
throw t;
}

if (LOG.isDebugEnabled()) {
LOG.debug("{}: Snapshot Installation Request received and is in progress", getMemberId());
LOG.debug("{}: StateMachine is processing Snapshot Installation Request.", getMemberId());
}
} else {
LOG.info("{}: Snapshot Installation by StateMachine is in progress.", getMemberId());
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine is already installing a snapshot.", getMemberId());
}
}

// If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE.
if (isSnapshotNull.compareAndSet(true, false)) {
LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
}

// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset
// installedSnapshotIndex to 0.
long latestInstalledSnapshotIndex = this.installedSnapshotIndex.getAndSet(0);
if (latestInstalledSnapshotIndex > 0) {
LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(),
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotIndex);
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotIndex);
}

// Otherwise, Snapshot installation is in progress.
if (LOG.isDebugEnabled()) {
LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.IN_PROGRESS);
}
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex

@Override
public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() >= 0);
synchronized (server) {
return LeaderProtoUtils.toInstallSnapshotRequestProto(server, getFollowerId(), firstAvailableLogTermIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.RaftLogIOException;
Expand Down Expand Up @@ -123,9 +122,21 @@ public void run() throws InterruptedException, IOException {
this, getFollower().getNextIndex(), getRaftLog().getStartIndex(), snapshot);

final InstallSnapshotReplyProto r = installSnapshot(snapshot);
if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
onFollowerTerm(r.getTerm());
} // otherwise if r is null, retry the snapshot installation
if (r != null) {
switch (r.getResult()) {
case NOT_LEADER:
onFollowerTerm(r.getTerm());
break;
case SUCCESS:
case SNAPSHOT_UNAVAILABLE:
case ALREADY_INSTALLED:
getFollower().setAttemptedToInstallSnapshot();
break;
default:
break;
}
}
// otherwise if r is null, retry the snapshot installation
} else {
final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
if (r != null) {
Expand Down
Loading