Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-1411. Alleviate slow follower issue #508

Merged
merged 4 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improve per comments
  • Loading branch information
ChenSammi committed Sep 29, 2021
commit 94f34d6d4a290490ba00ff574f10c2a98c1cc331
18 changes: 9 additions & 9 deletions ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ static BiConsumer<String, Integer> requireMax(int max) {
};
}

static BiConsumer<String, Float> requireMax(float max) {
static BiConsumer<String, Double> requireMax(double max) {
return (key, value) -> {
if (value > max) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -159,10 +159,10 @@ static long getLong(
}

@SafeVarargs
static float getFloat(
BiFunction<String, Float, Float> floatGetter,
String key, float defaultValue, Consumer<String> logger, BiConsumer<String, Float>... assertions) {
return get(floatGetter, key, defaultValue, logger, assertions);
static double getDouble(
BiFunction<String, Double, Double> doubleGetter,
String key, double defaultValue, Consumer<String> logger, BiConsumer<String, Double>... assertions) {
return get(doubleGetter, key, defaultValue, logger, assertions);
}

@SafeVarargs
Expand Down Expand Up @@ -235,10 +235,10 @@ static void setLong(
}

@SafeVarargs
static void setFloat(
BiConsumer<String, Float> floatSetter, String key, float value,
BiConsumer<String, Float>... assertions) {
set(floatSetter, key, value, assertions);
static void setDouble(
BiConsumer<String, Double> doubleSetter, String key, double value,
BiConsumer<String, Double>... assertions) {
set(doubleSetter, key, value, assertions);
}

@SafeVarargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,17 @@ static void setByteLimit(RaftProperties properties, SizeInBytes byteLimit) {
setSizeInBytes(properties::set, BYTE_LIMIT_KEY, byteLimit, requireMin(1L));
}

String FOLLOWER_MAX_GAP_RATIO_KEY = PREFIX + ".follower-max-gap-ratio";
float FOLLOWER_MAX_GAP_RATIO_DEFAULT = 1;
String FOLLOWER_GAP_RATIO_MAX_KEY = PREFIX + ".follower.gap.ratio.max";
// The valid range is [1, 0) and -1, -1 means disable this feature
double FOLLOWER_GAP_RATIO_MAX_KEY_DEFAULT = -1d;
double FOLLOWER_GAP_RATIO_MAX_DISABLED = -1d;

static float followerMaxGapRatio(RaftProperties properties) {
return getFloat(properties::getFloat, FOLLOWER_MAX_GAP_RATIO_KEY,
FOLLOWER_MAX_GAP_RATIO_DEFAULT, getDefaultLog(), requireMax(1f));
static double followerGapRatioMax(RaftProperties properties) {
return getDouble(properties::getDouble, FOLLOWER_GAP_RATIO_MAX_KEY,
FOLLOWER_GAP_RATIO_MAX_DISABLED, getDefaultLog(), requireMax(1d));
}
static void setFollowerMaxGapRatio(RaftProperties properties, float ratio) {
setFloat(properties::setFloat, FOLLOWER_MAX_GAP_RATIO_KEY, ratio, requireMax(1f));
static void setFollowerGapRatioMax(RaftProperties properties, float ratio) {
setDouble(properties::setDouble, FOLLOWER_GAP_RATIO_MAX_KEY, ratio, requireMax(1d));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
import java.util.stream.Stream;

import static org.apache.ratis.server.RaftServer.Division.LOG;
import static org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_MAX_GAP_RATIO_KEY;
import static org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_GAP_RATIO_MAX_KEY;
import static org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_GAP_RATIO_MAX_DISABLED;

/**
* States for leader only. It contains three different types of processors:
Expand Down Expand Up @@ -251,7 +252,6 @@ boolean removeAll(Collection<LogAppender> c) {
private final long placeHolderIndex;
private final RaftServerMetricsImpl raftServerMetrics;
private final LogAppenderMetrics logAppenderMetrics;
private final long maxPendingRequests;
private final long followerMaxGapThreshold;

LeaderStateImpl(RaftServerImpl server) {
Expand All @@ -272,13 +272,17 @@ boolean removeAll(Collection<LogAppender> c) {
this.pendingRequests = new PendingRequests(server.getMemberId(), properties, raftServerMetrics);
this.watchRequests = new WatchRequests(server.getMemberId(), properties);
this.messageStreamRequests = new MessageStreamRequests(server.getMemberId());
this.maxPendingRequests = RaftServerConfigKeys.Write.elementLimit(properties);
float followerMaxGapRatio = RaftServerConfigKeys.Write.followerMaxGapRatio(properties);

if (followerMaxGapRatio > 1f || followerMaxGapRatio <= 0f) {
throw new IllegalArgumentException(FOLLOWER_MAX_GAP_RATIO_KEY + "s value must between [1, 0)");
long maxPendingRequests = RaftServerConfigKeys.Write.elementLimit(properties);
double followerMaxGapRatio = RaftServerConfigKeys.Write.followerGapRatioMax(properties);

if (followerMaxGapRatio == FOLLOWER_GAP_RATIO_MAX_DISABLED) {
this.followerMaxGapThreshold = -1;
} else if (followerMaxGapRatio > 1f || followerMaxGapRatio <= 0f) {
throw new IllegalArgumentException(FOLLOWER_GAP_RATIO_MAX_KEY +
"s value must between [1, 0) to enable the feature");
} else {
this.followerMaxGapThreshold = (long) (followerMaxGapRatio * maxPendingRequests);
}
this.followerMaxGapThreshold = (long)(followerMaxGapRatio * maxPendingRequests);

final RaftConfigurationImpl conf = state.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
Expand Down Expand Up @@ -713,7 +717,7 @@ static MinMajorityMax valueOf(long[] sorted) {
static MinMajorityMax valueOf(long[] sorted, long gapThreshold) {
long majority = getMajority(sorted);
long min = sorted[0];
if ((majority - min) > gapThreshold) {
if (gapThreshold != -1 && (majority - min) > gapThreshold) {
// The the gap between majority and min(the slow follower) is greater than gapThreshold,
// set the majority to min, which will skip one round of lastCommittedIndex update in updateCommit().
majority = min;
Expand Down