Skip to content

Commit

Permalink
RATIS-1384.Change pending request limit unit to MB (apache#483)
Browse files Browse the repository at this point in the history
  • Loading branch information
sadanand48 authored Jul 12, 2021
1 parent 9b57296 commit 3f4f5ff
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ static BiConsumer<String, Long> requireMin(long min) {
};
}

static BiConsumer<String, SizeInBytes> requireMinSizeInByte(SizeInBytes min) {
return (key, value) -> {
if (value.getSize() < min.getSize()) {
throw new IllegalArgumentException(
key + " = " + value + " < min = " + min);
}
};
}

static BiConsumer<String, Long> requireMax(long max) {
return (key, value) -> {
if (value > max) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
*/
public final class SizeInBytes {
public static final SizeInBytes ONE_KB = valueOf("1k");
public static final SizeInBytes ONE_MB = valueOf("1m");

public static SizeInBytes valueOf(long size) {
final String s = String.valueOf(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ static void setElementLimit(RaftProperties properties, int limit) {
SizeInBytes BYTE_LIMIT_DEFAULT = SizeInBytes.valueOf("64MB");
static SizeInBytes byteLimit(RaftProperties properties) {
return getSizeInBytes(properties::getSizeInBytes,
BYTE_LIMIT_KEY, BYTE_LIMIT_DEFAULT, getDefaultLog());
BYTE_LIMIT_KEY, BYTE_LIMIT_DEFAULT, getDefaultLog(), requireMinSizeInByte(SizeInBytes.ONE_MB));
}
static void setByteLimit(RaftProperties properties, SizeInBytes byteLimit) {
setSizeInBytes(properties::set, BYTE_LIMIT_KEY, byteLimit, requireMin(1L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,32 +44,46 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

class PendingRequests {
public static final Logger LOG = LoggerFactory.getLogger(PendingRequests.class);

private static final int ONE_MB = SizeInBytes.ONE_MB.getSizeInt();

/**
* Round up to the nearest MB.
*/
static int roundUpMb(long bytes) {
return Math.toIntExact((bytes - 1) / ONE_MB + 1);
}

static class Permit {}

static class RequestLimits extends ResourceSemaphore.Group {
RequestLimits(int elementLimit, SizeInBytes byteLimit) {
super(elementLimit, byteLimit.getSizeInt());
RequestLimits(int elementLimit, int megabyteLimit) {
super(elementLimit, megabyteLimit);
}

int getElementCount() {
return get(0).used();
}

int getByteSize() {
int getMegaByteSize() {
return get(1).used();
}

ResourceSemaphore.ResourceAcquireStatus tryAcquire(Message message) {
return tryAcquire(1, Message.getSize(message));
ResourceSemaphore.ResourceAcquireStatus tryAcquire(int messageSizeMb) {
return tryAcquire(1, messageSizeMb);
}

void release(Message message) {
release(1, Message.getSize(message));
void releaseExtraMb(int extraMb) {
release(0, extraMb);
}

void release(int diffMb) {
release(1, diffMb);
}
}

Expand All @@ -82,19 +96,24 @@ private static class RequestMap {
private final Map<Permit, Permit> permits = new HashMap<>();
/** Track and limit the number of requests and the total message size. */
private final RequestLimits resource;
/** The size (in byte) of all the requests in this map. */
private final AtomicLong requestSize = new AtomicLong();


RequestMap(Object name, int elementLimit, SizeInBytes byteLimit, RaftServerMetricsImpl raftServerMetrics) {
RequestMap(Object name, int elementLimit, int megabyteLimit, RaftServerMetricsImpl raftServerMetrics) {
this.name = name;
this.resource = new RequestLimits(elementLimit, byteLimit);
this.resource = new RequestLimits(elementLimit, megabyteLimit);
this.raftServerMetrics = raftServerMetrics;

raftServerMetrics.addNumPendingRequestsGauge(resource::getElementCount);
raftServerMetrics.addNumPendingRequestsByteSize(resource::getByteSize);
raftServerMetrics.addNumPendingRequestsMegaByteSize(resource::getMegaByteSize);
}

Permit tryAcquire(Message message) {
final ResourceSemaphore.ResourceAcquireStatus acquired = resource.tryAcquire(message);
LOG.trace("tryAcquire? {}", acquired);
final int messageSize = Message.getSize(message);
final int messageSizeMb = roundUpMb(messageSize );
final ResourceSemaphore.ResourceAcquireStatus acquired = resource.tryAcquire(messageSizeMb);
LOG.trace("tryAcquire {} MB? {}", messageSizeMb, acquired);
if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT) {
raftServerMetrics.onRequestQueueLimitHit();
raftServerMetrics.onResourceLimitHit();
Expand All @@ -104,6 +123,14 @@ Permit tryAcquire(Message message) {
raftServerMetrics.onResourceLimitHit();
return null;
}

// release extra MB
final long oldSize = requestSize.getAndAdd(messageSize);
final long newSize = oldSize + messageSize;
final int diffMb = roundUpMb(newSize) - roundUpMb(oldSize);
if (messageSizeMb > diffMb) {
resource.releaseExtraMb(messageSizeMb - diffMb);
}
return putPermit();
}

Expand Down Expand Up @@ -140,8 +167,12 @@ PendingRequest remove(long index) {
if (r == null) {
return null;
}
resource.release(r.getRequest().getMessage());
LOG.trace("release");
final int messageSize = Message.getSize(r.getRequest().getMessage());
final long oldSize = requestSize.getAndAdd(-messageSize);
final long newSize = oldSize - messageSize;
final int diffMb = roundUpMb(oldSize) - roundUpMb(newSize);
resource.release(diffMb);
LOG.trace("release {} MB", diffMb);
return r;
}

Expand Down Expand Up @@ -183,7 +214,9 @@ void close() {
this.name = id + "-" + JavaUtils.getClassSimpleName(getClass());
this.pendingRequests = new RequestMap(id,
RaftServerConfigKeys.Write.elementLimit(properties),
RaftServerConfigKeys.Write.byteLimit(properties),
Math.toIntExact(
RaftServerConfigKeys.Write.byteLimit(properties).getSize()
/ SizeInBytes.ONE_MB.getSize()), //round down
raftServerMetrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public final class RaftServerMetricsImpl extends RatisMetrics implements RaftSer
public static final String RESOURCE_LIMIT_HIT_COUNTER = "leaderNumResourceLimitHits";
public static final String REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER = "numRequestsByteSizeLimitHits";
public static final String REQUEST_QUEUE_SIZE = "numPendingRequestInQueue";
public static final String REQUEST_BYTE_SIZE = "numPendingRequestByteSize";
public static final String REQUEST_MEGA_BYTE_SIZE = "numPendingRequestMegaByteSize";
public static final String RETRY_CACHE_ENTRY_COUNT_METRIC = "retryCacheEntryCount";
public static final String RETRY_CACHE_HIT_COUNT_METRIC = "retryCacheHitCount";
public static final String RETRY_CACHE_HIT_RATE_METRIC = "retryCacheHitRate";
Expand Down Expand Up @@ -221,12 +221,12 @@ public boolean removeNumPendingRequestsGauge() {
return registry.remove(REQUEST_QUEUE_SIZE);
}

public void addNumPendingRequestsByteSize(Gauge byteSize) {
registry.gauge(REQUEST_BYTE_SIZE, () -> byteSize);
public void addNumPendingRequestsMegaByteSize(Gauge megabyteSize) {
registry.gauge(REQUEST_MEGA_BYTE_SIZE, () -> megabyteSize);
}

public boolean removeNumPendingRequestsByteSize() {
return registry.remove(REQUEST_BYTE_SIZE);
return registry.remove(REQUEST_MEGA_BYTE_SIZE);
}

public void onRequestByteSizeLimitHit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RAFT_CLIENT_WATCH_REQUEST;
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RAFT_CLIENT_WRITE_REQUEST;
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_QUEUE_LIMIT_HIT_COUNTER;
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_BYTE_SIZE;
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_MEGA_BYTE_SIZE;
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER;
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RESOURCE_LIMIT_HIT_COUNTER;

Expand Down Expand Up @@ -210,7 +210,7 @@ public void testRaftClientMetrics() throws Exception {
public void testRaftServerMetrics() throws Exception {
final RaftProperties p = getProperties();
RaftServerConfigKeys.Write.setElementLimit(p, 10);
RaftServerConfigKeys.Write.setByteLimit(p, SizeInBytes.valueOf(110));
RaftServerConfigKeys.Write.setByteLimit(p, SizeInBytes.valueOf("1MB"));
try {
runWithNewCluster(3, this::testRequestMetrics);
} finally {
Expand Down Expand Up @@ -242,10 +242,9 @@ void testRequestMetrics(MiniRaftClusterWithGrpc cluster) throws Exception {


final SortedMap<String, Gauge> gaugeMap = getRaftServerMetrics(cluster.getLeader())
.getRegistry().getGauges((s, metric) -> s.contains(REQUEST_BYTE_SIZE));
.getRegistry().getGauges((s, metric) -> s.contains(
REQUEST_MEGA_BYTE_SIZE));

RaftTestUtil.waitFor(() -> (int) gaugeMap.get(gaugeMap.firstKey()).getValue() == message.length(),
300, 5000);

for (int i = 0; i < 10; i++) {
client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
Expand All @@ -259,11 +258,12 @@ void testRequestMetrics(MiniRaftClusterWithGrpc cluster) throws Exception {

stateMachine.unblockFlushStateMachineData();

// Send a message with 120, our byte size limit is 110, so it should fail
// Send a message with 1025kb , our byte size limit is 1024kb (1mb) , so it should fail
// and byte size counter limit will be hit.

client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
client.async().send(new SimpleMessage(RandomStringUtils.random(120, true, false)));
client.async().send(new SimpleMessage(RandomStringUtils
.random(SizeInBytes.valueOf("1025kb").getSizeInt(), true, false)));
clients.add(client);

RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SizeInBytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -36,6 +37,10 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.ratis.conf.ConfUtils.requireMin;
import static org.apache.ratis.conf.ConfUtils.setSizeInBytes;
import static org.apache.ratis.server.RaftServerConfigKeys.Write.BYTE_LIMIT_KEY;

/**
* Test cases to verify RaftServerConfigKeys.
*/
Expand Down Expand Up @@ -95,4 +100,19 @@ public void testStorageDir() {
Assert.assertEquals(directories.size(), storageDirs.size());
Assert.assertEquals(0, actualDirs.size());
}

/**
* Sets the value to <code>raft.server.write.byte-limit</code> via
* RaftServerConfigKeys and also verifies the same via RaftServerConfigKeys.
*/
@Test public void testPendingRequestSize() {
RaftProperties properties = new RaftProperties();
// setting to 4GB
setSizeInBytes(properties::set, BYTE_LIMIT_KEY, SizeInBytes.valueOf("4gb"),
requireMin(1L));
int pendingRequestMegabyteLimit = Math.toIntExact(
RaftServerConfigKeys.Write.byteLimit(properties).getSize()
/ SizeInBytes.ONE_MB.getSize());
Assert.assertEquals(4096, pendingRequestMegabyteLimit);
}
}

0 comments on commit 3f4f5ff

Please sign in to comment.