Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-1291. Send heartbeat when there is no reply #398

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ public void run() throws IOException {
}

appendLog(installSnapshotRequired || haveTooManyPendingRequests());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: undo this change.

}
getLeaderState().checkHealth(getFollower());
}
Expand Down Expand Up @@ -235,7 +234,10 @@ private void sendRequest(AppendEntriesRequest request, AppendEntriesRequestProto
scheduler.onTimeout(requestTimeoutDuration,
() -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()),
LOG, () -> "Timeout check failed for append entry request: " + request);
getFollower().updateLastRpcSendTime();
request.recordSendTime();
if (request.isHeartbeat()) {
getFollower().updateLastHeartBeatSendTime();
}
}

private void timeoutAppendRequest(long cid, boolean heartbeat) {
Expand Down Expand Up @@ -285,16 +287,17 @@ public void onNext(AppendEntriesReplyProto reply) {
}

try {
onNextImpl(reply);
Timestamp sendTime = request != null ? request.getSendTime() : Timestamp.currentTime();
onNextImpl(reply, sendTime);
} catch(Exception t) {
LOG.error("Failed onNext request=" + request
+ ", reply=" + ServerStringUtils.toAppendEntriesReplyString(reply), t);
}
}

private void onNextImpl(AppendEntriesReplyProto reply) {
private void onNextImpl(AppendEntriesReplyProto reply, Timestamp sendTime) {
// update the last rpc time
getFollower().updateLastRpcResponseTime();
getFollower().updateLastRpcResponseTime(sendTime);

if (!firstResponseReceived) {
firstResponseReceived = true;
Expand Down Expand Up @@ -358,7 +361,7 @@ private synchronized void updateNextIndex(long replyNextIndex) {

private class InstallSnapshotResponseHandler implements StreamObserver<InstallSnapshotReplyProto> {
private final String name = getFollower().getName() + "-" + JavaUtils.getClassSimpleName(getClass());
private final Queue<Integer> pending;
private final Map<Integer, InstallSnapshotRequest> pending = new ConcurrentHashMap<>();
private final AtomicBoolean done = new AtomicBoolean(false);
private final boolean isNotificationOnly;

Expand All @@ -367,18 +370,19 @@ private class InstallSnapshotResponseHandler implements StreamObserver<InstallSn
}

InstallSnapshotResponseHandler(boolean notifyOnly) {
pending = new LinkedList<>();
this.isNotificationOnly = notifyOnly;
}

synchronized void addPending(InstallSnapshotRequestProto request) {
pending.offer(request.getSnapshotChunk().getRequestIndex());
InstallSnapshotRequest installSnapshotRequest = new InstallSnapshotRequest(request);
installSnapshotRequest.recordSendTime();
pending.put(request.getSnapshotChunk().getRequestIndex(), installSnapshotRequest);
}

synchronized void removePending(InstallSnapshotReplyProto reply) {
final Integer index = pending.poll();
Objects.requireNonNull(index, "index == null");
Preconditions.assertTrue(index == reply.getRequestIndex());
InstallSnapshotRequest request = pending.remove(reply.getRequestIndex());
Objects.requireNonNull(request, "index == null");
Preconditions.assertTrue(request.getProto().getSnapshotChunk().getRequestIndex() == reply.getRequestIndex());
}

boolean isDone() {
Expand All @@ -402,7 +406,10 @@ public void onNext(InstallSnapshotReplyProto reply) {
}

// update the last rpc time
getFollower().updateLastRpcResponseTime();
InstallSnapshotRequest request = pending.get(reply.getRequestIndex());

Timestamp sendTime = request != null ? request.getSendTime() : Timestamp.currentTime();
getFollower().updateLastRpcResponseTime(sendTime);

if (!firstResponseReceived) {
firstResponseReceived = true;
Expand Down Expand Up @@ -484,7 +491,6 @@ private void installSnapshot(SnapshotInfo snapshot) {
for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
if (isRunning()) {
snapshotRequestObserver.onNext(request);
getFollower().updateLastRpcSendTime();
responseHandler.addPending(request);
} else {
break;
Expand Down Expand Up @@ -534,7 +540,6 @@ private void installSnapshot(TermIndex firstAvailableLogTermIndex) {
try {
snapshotRequestObserver = getClient().installSnapshot(responseHandler);
snapshotRequestObserver.onNext(request);
getFollower().updateLastRpcSendTime();
responseHandler.addPending(request);
snapshotRequestObserver.onCompleted();
} catch (Exception e) {
Expand Down Expand Up @@ -577,6 +582,27 @@ private TermIndex shouldNotifyToInstallSnapshot() {
return null;
}

static class InstallSnapshotRequest {
private final InstallSnapshotRequestProto proto;
private Timestamp sendTime;

InstallSnapshotRequest(InstallSnapshotRequestProto proto) {
this.proto = proto;
}

InstallSnapshotRequestProto getProto() {
return proto;
}

void recordSendTime() {
this.sendTime = Timestamp.currentTime();
}

Timestamp getSendTime() {
return sendTime;
}
}

static class AppendEntriesRequest {
private final Timer timer;
private volatile Timer.Context timerContext;
Expand All @@ -587,6 +613,8 @@ static class AppendEntriesRequest {

private final TermIndex lastEntry;

private Timestamp sendTime;

AppendEntriesRequest(AppendEntriesRequestProto proto, RaftPeerId followerId, GrpcServerMetrics grpcServerMetrics) {
this.callId = proto.getServerRequest().getCallId();
this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null;
Expand All @@ -601,6 +629,14 @@ long getCallId() {
return callId;
}

Timestamp getSendTime() {
return sendTime;
}

void recordSendTime() {
this.sendTime = Timestamp.currentTime();
}

TermIndex getPreviousLog() {
return previousLog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ public interface FollowerInfo {
Timestamp getLastRpcResponseTime();

/** Update lastRpcResponseTime to the current time. */
void updateLastRpcResponseTime();
void updateLastRpcResponseTime(Timestamp sendTime);

/** Update lastRpcSendTime to the current time. */
void updateLastRpcSendTime();
/** Update lastHeartBeatSendTime to the current time. */
void updateLastHeartBeatSendTime();

/** @return the latest of the lastRpcSendTime and the lastRpcResponseTime . */
Timestamp getLastRpcTime();
/** @return the lastRpcSendTimeWithResponse . */
Timestamp getLastRpcSendTimeWithResponse();

/** @return the lastHeartBeatSendTime . */
Timestamp getLastHeartBeatSendTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ default boolean shouldHeartbeat() {
* @return the time in milliseconds that the leader should send a heartbeat.
*/
default long getHeartbeatRemainingTimeMs() {
return getServer().properties().minRpcTimeoutMs()/2 - getFollower().getLastRpcTime().elapsedTimeMs();
return getServer().properties().minRpcTimeoutMs()/2 -
Math.min(getFollower().getLastRpcSendTimeWithResponse().elapsedTimeMs(),
getFollower().getLastHeartBeatSendTime().elapsedTimeMs());
}

/** Handle the event that the follower has replied a term. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class FollowerInfoImpl implements FollowerInfo {

private final RaftPeer peer;
private final AtomicReference<Timestamp> lastRpcResponseTime;
private final AtomicReference<Timestamp> lastRpcSendTime;
private final AtomicReference<Timestamp> lastRpcSendTimeWithResponse;
private final AtomicReference<Timestamp> lastHeartBeatSendTime;
private final RaftLogIndex nextIndex;
private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", 0L);
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
Expand All @@ -48,7 +49,8 @@ class FollowerInfoImpl implements FollowerInfo {

this.peer = peer;
this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime);
this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
this.lastRpcSendTimeWithResponse = new AtomicReference<>(lastRpcTime);
this.lastHeartBeatSendTime = new AtomicReference<>(lastRpcTime);
this.nextIndex = new RaftLogIndex("nextIndex", nextIndex);
this.attendVote = attendVote;
}
Expand Down Expand Up @@ -119,7 +121,8 @@ public String getName() {
public String toString() {
return name + "(c" + getCommitIndex() + ",m" + getMatchIndex() + ",n" + getNextIndex()
+ ", attendVote=" + attendVote +
", lastRpcSendTime=" + lastRpcSendTime.get().elapsedTimeMs() +
", lastHeartBeatSendTime=" + lastHeartBeatSendTime.get().elapsedTimeMs() +
", lastRpcSendTimeWithResponse=" + lastRpcSendTimeWithResponse.get().elapsedTimeMs() +
", lastRpcResponseTime=" + lastRpcResponseTime.get().elapsedTimeMs() + ")";
}

Expand All @@ -137,8 +140,11 @@ public RaftPeer getPeer() {
}

@Override
public void updateLastRpcResponseTime() {
public void updateLastRpcResponseTime(Timestamp sendTime) {
lastRpcResponseTime.set(Timestamp.currentTime());
if (sendTime.compareTo(lastRpcSendTimeWithResponse.get()) > 0) {
lastRpcSendTimeWithResponse.set(sendTime);
}
}

@Override
Expand All @@ -147,12 +153,17 @@ public Timestamp getLastRpcResponseTime() {
}

@Override
public void updateLastRpcSendTime() {
lastRpcSendTime.set(Timestamp.currentTime());
public Timestamp getLastRpcSendTimeWithResponse() {
return lastRpcSendTimeWithResponse.get();
}

@Override
public Timestamp getLastRpcTime() {
return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get());
public void updateLastHeartBeatSendTime() {
lastHeartBeatSendTime.set(Timestamp.currentTime());
}

@Override
public Timestamp getLastHeartBeatSendTime() {
return lastHeartBeatSendTime.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex
LogAppender logAppender = server.newLogAppender(this, f);
peerIdFollowerInfoMap.put(peer.getId(), f);
raftServerMetrics.addFollower(peer.getId());
logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
logAppenderMetrics.addFollowerGauges(
peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcResponseTime);
return logAppender;
}).collect(Collectors.toList());
senders.addAll(newAppenders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.Timestamp;

import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -60,9 +61,12 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries()
return null;
}

getFollower().updateLastRpcSendTime();
if (request.getEntriesCount() == 0) {
getFollower().updateLastHeartBeatSendTime();
}
Timestamp sendTime = Timestamp.currentTime();
final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
getFollower().updateLastRpcResponseTime();
getFollower().updateLastRpcResponseTime(sendTime);

getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
return r;
Expand All @@ -87,9 +91,9 @@ private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws
InstallSnapshotReplyProto reply = null;
try {
for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
getFollower().updateLastRpcSendTime();
Timestamp sendTime = Timestamp.currentTime();
reply = getServerRpc().installSnapshot(request);
getFollower().updateLastRpcResponseTime();
getFollower().updateLastRpcResponseTime(sendTime);

if (!reply.getServerReply().getSuccess()) {
return reply;
Expand Down