Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-1411. Alleviate slow follower issue #508

Merged
merged 4 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ static BiConsumer<String, Integer> requireMax(int max) {
};
}

static BiConsumer<String, Double> requireMax(double max) {
return (key, value) -> {
if (value > max) {
throw new IllegalArgumentException(
key + " = " + value + " > max = " + max);
}
};
}

static BiConsumer<String, Long> requireMin(SizeInBytes min) {
return requireMin(min.getSize());
}
Expand Down Expand Up @@ -149,6 +158,13 @@ static long getLong(
return get(longGetter, key, defaultValue, logger, assertions);
}

@SafeVarargs
static double getDouble(
BiFunction<String, Double, Double> doubleGetter,
String key, double defaultValue, Consumer<String> logger, BiConsumer<String, Double>... assertions) {
return get(doubleGetter, key, defaultValue, logger, assertions);
}

@SafeVarargs
static File getFile(
BiFunction<String, File, File> fileGetter,
Expand Down Expand Up @@ -218,6 +234,13 @@ static void setLong(
set(longSetter, key, value, assertions);
}

@SafeVarargs
static void setDouble(
BiConsumer<String, Double> doubleSetter, String key, double value,
BiConsumer<String, Double>... assertions) {
set(doubleSetter, key, value, assertions);
}

@SafeVarargs
static void setFile(
BiConsumer<String, File> fileSetter, String key, File value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ static SizeInBytes byteLimit(RaftProperties properties) {
static void setByteLimit(RaftProperties properties, SizeInBytes byteLimit) {
setSizeInBytes(properties::set, BYTE_LIMIT_KEY, byteLimit, requireMin(1L));
}

String FOLLOWER_GAP_RATIO_MAX_KEY = PREFIX + ".follower.gap.ratio.max";
// The valid range is [1, 0) and -1, -1 means disable this feature
double FOLLOWER_GAP_RATIO_MAX_DEFAULT = -1d;

static double followerGapRatioMax(RaftProperties properties) {
return getDouble(properties::getDouble, FOLLOWER_GAP_RATIO_MAX_KEY,
FOLLOWER_GAP_RATIO_MAX_DEFAULT, getDefaultLog(), requireMax(1d));
}
static void setFollowerGapRatioMax(RaftProperties properties, float ratio) {
setDouble(properties::setDouble, FOLLOWER_GAP_RATIO_MAX_KEY, ratio, requireMax(1d));
}
}

interface Watch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.stream.Stream;

import static org.apache.ratis.server.RaftServer.Division.LOG;
import static org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_GAP_RATIO_MAX_KEY;

/**
* States for leader only. It contains three different types of processors:
Expand Down Expand Up @@ -250,6 +251,7 @@ boolean removeAll(Collection<LogAppender> c) {
private final long placeHolderIndex;
private final RaftServerMetricsImpl raftServerMetrics;
private final LogAppenderMetrics logAppenderMetrics;
private final long followerMaxGapThreshold;

LeaderStateImpl(RaftServerImpl server) {
this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass());
Expand All @@ -269,6 +271,17 @@ boolean removeAll(Collection<LogAppender> c) {
this.pendingRequests = new PendingRequests(server.getMemberId(), properties, raftServerMetrics);
this.watchRequests = new WatchRequests(server.getMemberId(), properties);
this.messageStreamRequests = new MessageStreamRequests(server.getMemberId());
long maxPendingRequests = RaftServerConfigKeys.Write.elementLimit(properties);
double followerGapRatioMax = RaftServerConfigKeys.Write.followerGapRatioMax(properties);

if (followerGapRatioMax == -1) {
this.followerMaxGapThreshold = -1;
} else if (followerGapRatioMax > 1f || followerGapRatioMax <= 0f) {
throw new IllegalArgumentException(FOLLOWER_GAP_RATIO_MAX_KEY +
"s value must between [1, 0) to enable the feature");
} else {
this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests);
}

final RaftConfigurationImpl conf = state.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
Expand Down Expand Up @@ -700,6 +713,17 @@ static MinMajorityMax valueOf(long[] sorted) {
return new MinMajorityMax(sorted[0], getMajority(sorted), getMax(sorted));
}

static MinMajorityMax valueOf(long[] sorted, long gapThreshold) {
long majority = getMajority(sorted);
long min = sorted[0];
if (gapThreshold != -1 && (majority - min) > gapThreshold) {
// The the gap between majority and min(the slow follower) is greater than gapThreshold,
// set the majority to min, which will skip one round of lastCommittedIndex update in updateCommit().
majority = min;
}
return new MinMajorityMax(min, majority, getMax(sorted));
}

static long getMajority(long[] sorted) {
return sorted[(sorted.length - 1) / 2];
}
Expand All @@ -725,7 +749,7 @@ private Optional<MinMajorityMax> getMajorityMin(ToLongFunction<FollowerInfo> fol
}

final long[] indicesInNewConf = getSorted(followers, includeSelf, followerIndex, logIndex);
final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf);
final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf, followerMaxGapThreshold);

if (!conf.isTransitional()) {
return Optional.of(newConf);
Expand All @@ -737,7 +761,7 @@ private Optional<MinMajorityMax> getMajorityMin(ToLongFunction<FollowerInfo> fol
}

final long[] indicesInOldConf = getSorted(oldFollowers, includeSelfInOldConf, followerIndex, logIndex);
final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf);
final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf, followerMaxGapThreshold);
return Optional.of(newConf.combine(oldConf));
}
}
Expand Down