Skip to content

Commit

Permalink
RATIS-1411. Alleviate slow follower issue (apache#508)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenSammi authored Sep 30, 2021
1 parent c7d4b6a commit 837b063
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 2 deletions.
23 changes: 23 additions & 0 deletions ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ static BiConsumer<String, Integer> requireMax(int max) {
};
}

static BiConsumer<String, Double> requireMax(double max) {
return (key, value) -> {
if (value > max) {
throw new IllegalArgumentException(
key + " = " + value + " > max = " + max);
}
};
}

static BiConsumer<String, Long> requireMin(SizeInBytes min) {
return requireMin(min.getSize());
}
Expand Down Expand Up @@ -149,6 +158,13 @@ static long getLong(
return get(longGetter, key, defaultValue, logger, assertions);
}

@SafeVarargs
static double getDouble(
BiFunction<String, Double, Double> doubleGetter,
String key, double defaultValue, Consumer<String> logger, BiConsumer<String, Double>... assertions) {
return get(doubleGetter, key, defaultValue, logger, assertions);
}

@SafeVarargs
static File getFile(
BiFunction<String, File, File> fileGetter,
Expand Down Expand Up @@ -218,6 +234,13 @@ static void setLong(
set(longSetter, key, value, assertions);
}

@SafeVarargs
static void setDouble(
BiConsumer<String, Double> doubleSetter, String key, double value,
BiConsumer<String, Double>... assertions) {
set(doubleSetter, key, value, assertions);
}

@SafeVarargs
static void setFile(
BiConsumer<String, File> fileSetter, String key, File value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ static SizeInBytes byteLimit(RaftProperties properties) {
static void setByteLimit(RaftProperties properties, SizeInBytes byteLimit) {
setSizeInBytes(properties::set, BYTE_LIMIT_KEY, byteLimit, requireMin(1L));
}

String FOLLOWER_GAP_RATIO_MAX_KEY = PREFIX + ".follower.gap.ratio.max";
// The valid range is [1, 0) and -1, -1 means disable this feature
double FOLLOWER_GAP_RATIO_MAX_DEFAULT = -1d;

static double followerGapRatioMax(RaftProperties properties) {
return getDouble(properties::getDouble, FOLLOWER_GAP_RATIO_MAX_KEY,
FOLLOWER_GAP_RATIO_MAX_DEFAULT, getDefaultLog(), requireMax(1d));
}
static void setFollowerGapRatioMax(RaftProperties properties, float ratio) {
setDouble(properties::setDouble, FOLLOWER_GAP_RATIO_MAX_KEY, ratio, requireMax(1d));
}
}

interface Watch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.stream.Stream;

import static org.apache.ratis.server.RaftServer.Division.LOG;
import static org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_GAP_RATIO_MAX_KEY;

/**
* States for leader only. It contains three different types of processors:
Expand Down Expand Up @@ -250,6 +251,7 @@ boolean removeAll(Collection<LogAppender> c) {
private final long placeHolderIndex;
private final RaftServerMetricsImpl raftServerMetrics;
private final LogAppenderMetrics logAppenderMetrics;
private final long followerMaxGapThreshold;

LeaderStateImpl(RaftServerImpl server) {
this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass());
Expand All @@ -269,6 +271,17 @@ boolean removeAll(Collection<LogAppender> c) {
this.pendingRequests = new PendingRequests(server.getMemberId(), properties, raftServerMetrics);
this.watchRequests = new WatchRequests(server.getMemberId(), properties);
this.messageStreamRequests = new MessageStreamRequests(server.getMemberId());
long maxPendingRequests = RaftServerConfigKeys.Write.elementLimit(properties);
double followerGapRatioMax = RaftServerConfigKeys.Write.followerGapRatioMax(properties);

if (followerGapRatioMax == -1) {
this.followerMaxGapThreshold = -1;
} else if (followerGapRatioMax > 1f || followerGapRatioMax <= 0f) {
throw new IllegalArgumentException(FOLLOWER_GAP_RATIO_MAX_KEY +
"s value must between [1, 0) to enable the feature");
} else {
this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests);
}

final RaftConfigurationImpl conf = state.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
Expand Down Expand Up @@ -700,6 +713,17 @@ static MinMajorityMax valueOf(long[] sorted) {
return new MinMajorityMax(sorted[0], getMajority(sorted), getMax(sorted));
}

static MinMajorityMax valueOf(long[] sorted, long gapThreshold) {
long majority = getMajority(sorted);
long min = sorted[0];
if (gapThreshold != -1 && (majority - min) > gapThreshold) {
// The the gap between majority and min(the slow follower) is greater than gapThreshold,
// set the majority to min, which will skip one round of lastCommittedIndex update in updateCommit().
majority = min;
}
return new MinMajorityMax(min, majority, getMax(sorted));
}

static long getMajority(long[] sorted) {
return sorted[(sorted.length - 1) / 2];
}
Expand All @@ -725,7 +749,7 @@ private Optional<MinMajorityMax> getMajorityMin(ToLongFunction<FollowerInfo> fol
}

final long[] indicesInNewConf = getSorted(followers, includeSelf, followerIndex, logIndex);
final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf);
final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf, followerMaxGapThreshold);

if (!conf.isTransitional()) {
return Optional.of(newConf);
Expand All @@ -737,7 +761,7 @@ private Optional<MinMajorityMax> getMajorityMin(ToLongFunction<FollowerInfo> fol
}

final long[] indicesInOldConf = getSorted(oldFollowers, includeSelfInOldConf, followerIndex, logIndex);
final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf);
final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf, followerMaxGapThreshold);
return Optional.of(newConf.combine(oldConf));
}
}
Expand Down

0 comments on commit 837b063

Please sign in to comment.