Skip to content

Commit f741873

Browse files
rmdmattinglyRay Mattingly
authored andcommitted
HBASE-28672 Ensure large batches are not indefinitely blocked by quotas (#6003)
Co-authored-by: Ray Mattingly <rmattingly@hubspot.com> Signed-off-by: Bryan Beaudreault < bbeaudreault@apache.org> Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
1 parent 282f0c0 commit f741873

File tree

5 files changed

+236
-2
lines changed

5 files changed

+236
-2
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,16 @@ private void checkQuota(long numWrites, long numReads) throws RpcThrottlingExcep
111111
continue;
112112
}
113113

114-
limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed,
115-
writeCapacityUnitConsumed, readCapacityUnitConsumed);
114+
long maxRequestsToEstimate = limiter.getRequestNumLimit();
115+
long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit());
116+
long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit());
117+
long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit());
118+
long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit());
119+
120+
limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites),
121+
Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads),
122+
Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed,
123+
readCapacityUnitConsumed);
116124
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
117125
}
118126

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,21 @@ public long getWriteAvailable() {
6565
throw new UnsupportedOperationException();
6666
}
6767

68+
@Override
69+
public long getRequestNumLimit() {
70+
return Long.MAX_VALUE;
71+
}
72+
73+
@Override
74+
public long getReadNumLimit() {
75+
return Long.MAX_VALUE;
76+
}
77+
78+
@Override
79+
public long getWriteNumLimit() {
80+
return Long.MAX_VALUE;
81+
}
82+
6883
@Override
6984
public long getReadAvailable() {
7085
throw new UnsupportedOperationException();
@@ -75,6 +90,11 @@ public long getReadLimit() {
7590
return Long.MAX_VALUE;
7691
}
7792

93+
@Override
94+
public long getWriteLimit() {
95+
return Long.MAX_VALUE;
96+
}
97+
7898
@Override
7999
public String toString() {
80100
return "NoopQuotaLimiter";

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,19 @@ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
7979
/** Returns the maximum number of bytes ever available to read */
8080
long getReadLimit();
8181

82+
/** Returns the maximum number of bytes ever available to write */
83+
long getWriteLimit();
84+
8285
/** Returns the number of bytes available to write to avoid exceeding the quota */
8386
long getWriteAvailable();
87+
88+
/** Returns the maximum number of requests to allow per TimeUnit */
89+
long getRequestNumLimit();
90+
91+
/** Returns the maximum number of reads to allow per TimeUnit */
92+
long getReadNumLimit();
93+
94+
/** Returns the maximum number of writes to allow per TimeUnit */
95+
long getWriteNumLimit();
96+
8497
}

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,27 @@ public long getWriteAvailable() {
240240
return writeSizeLimiter.getAvailable();
241241
}
242242

243+
@Override
244+
public long getRequestNumLimit() {
245+
long readAndWriteLimit = readReqsLimiter.getLimit() + writeReqsLimiter.getLimit();
246+
247+
if (readAndWriteLimit < 0) { // handle overflow
248+
readAndWriteLimit = Long.MAX_VALUE;
249+
}
250+
251+
return Math.min(reqsLimiter.getLimit(), readAndWriteLimit);
252+
}
253+
254+
@Override
255+
public long getReadNumLimit() {
256+
return readReqsLimiter.getLimit();
257+
}
258+
259+
@Override
260+
public long getWriteNumLimit() {
261+
return writeReqsLimiter.getLimit();
262+
}
263+
243264
@Override
244265
public long getReadAvailable() {
245266
return readSizeLimiter.getAvailable();
@@ -250,6 +271,11 @@ public long getReadLimit() {
250271
return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
251272
}
252273

274+
@Override
275+
public long getWriteLimit() {
276+
return Math.min(writeSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
277+
}
278+
253279
@Override
254280
public String toString() {
255281
StringBuilder builder = new StringBuilder();

hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,37 @@
1818
package org.apache.hadoop.hbase.quotas;
1919

2020
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertThrows;
2122
import static org.junit.Assert.assertTrue;
2223

24+
import org.apache.hadoop.conf.Configuration;
2325
import org.apache.hadoop.hbase.HBaseClassTestRule;
2426
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
2527
import org.apache.hadoop.hbase.testclassification.SmallTests;
28+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
29+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
30+
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
2631
import org.junit.ClassRule;
2732
import org.junit.Test;
2833
import org.junit.experimental.categories.Category;
2934

35+
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
36+
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
37+
3038
@Category({ RegionServerTests.class, SmallTests.class })
3139
public class TestDefaultOperationQuota {
3240
@ClassRule
3341
public static final HBaseClassTestRule CLASS_RULE =
3442
HBaseClassTestRule.forClass(TestDefaultOperationQuota.class);
3543

44+
private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
45+
static {
46+
envEdge.setValue(EnvironmentEdgeManager.currentTime());
47+
// only active the envEdge for quotas package
48+
EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge,
49+
ThrottleQuotaTestUtil.class.getPackage().getName());
50+
}
51+
3652
@Test
3753
public void testScanEstimateNewScanner() {
3854
long blockSize = 64 * 1024;
@@ -125,4 +141,155 @@ public void testScanEstimateShrinkingWorkload() {
125141
// shrinking workload should only shrink estimate to maxBBS
126142
assertEquals(maxBlockBytesScanned, estimate);
127143
}
144+
145+
@Test
146+
public void testLargeBatchSaturatesReadNumLimit()
147+
throws RpcThrottlingException, InterruptedException {
148+
int limit = 10;
149+
QuotaProtos.Throttle throttle =
150+
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
151+
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
152+
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
153+
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);
154+
155+
// use the whole limit
156+
quota.checkBatchQuota(0, limit);
157+
158+
// the next request should be rejected
159+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));
160+
161+
envEdge.incValue(1000);
162+
// after the TimeUnit, the limit should be refilled
163+
quota.checkBatchQuota(0, limit);
164+
}
165+
166+
@Test
167+
public void testLargeBatchSaturatesReadWriteLimit()
168+
throws RpcThrottlingException, InterruptedException {
169+
int limit = 10;
170+
QuotaProtos.Throttle throttle =
171+
QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
172+
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
173+
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
174+
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);
175+
176+
// use the whole limit
177+
quota.checkBatchQuota(limit, 0);
178+
179+
// the next request should be rejected
180+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));
181+
182+
envEdge.incValue(1000);
183+
// after the TimeUnit, the limit should be refilled
184+
quota.checkBatchQuota(limit, 0);
185+
}
186+
187+
@Test
188+
public void testTooLargeReadBatchIsNotBlocked()
189+
throws RpcThrottlingException, InterruptedException {
190+
int limit = 10;
191+
QuotaProtos.Throttle throttle =
192+
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
193+
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
194+
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
195+
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);
196+
197+
// use more than the limit, which should succeed rather than being indefinitely blocked
198+
quota.checkBatchQuota(0, 10 + limit);
199+
200+
// the next request should be blocked
201+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));
202+
203+
envEdge.incValue(1000);
204+
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
205+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit));
206+
}
207+
208+
@Test
209+
public void testTooLargeWriteBatchIsNotBlocked()
210+
throws RpcThrottlingException, InterruptedException {
211+
int limit = 10;
212+
QuotaProtos.Throttle throttle =
213+
QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
214+
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
215+
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
216+
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);
217+
218+
// use more than the limit, which should succeed rather than being indefinitely blocked
219+
quota.checkBatchQuota(10 + limit, 0);
220+
221+
// the next request should be blocked
222+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));
223+
224+
envEdge.incValue(1000);
225+
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
226+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0));
227+
}
228+
229+
@Test
230+
public void testTooLargeWriteSizeIsNotBlocked()
231+
throws RpcThrottlingException, InterruptedException {
232+
int limit = 50;
233+
QuotaProtos.Throttle throttle =
234+
QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder()
235+
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
236+
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
237+
DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);
238+
239+
// writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked
240+
quota.checkBatchQuota(1, 0);
241+
242+
// the next request should be blocked
243+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));
244+
245+
envEdge.incValue(1000);
246+
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
247+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0));
248+
}
249+
250+
@Test
251+
public void testTooLargeReadSizeIsNotBlocked()
252+
throws RpcThrottlingException, InterruptedException {
253+
long blockSize = 65536;
254+
long limit = blockSize / 2;
255+
QuotaProtos.Throttle throttle =
256+
QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder()
257+
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
258+
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
259+
DefaultOperationQuota quota =
260+
new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);
261+
262+
// reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
263+
quota.checkBatchQuota(0, 1);
264+
265+
// the next request should be blocked
266+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));
267+
268+
envEdge.incValue(1000);
269+
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
270+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1));
271+
}
272+
273+
@Test
274+
public void testTooLargeRequestSizeIsNotBlocked()
275+
throws RpcThrottlingException, InterruptedException {
276+
long blockSize = 65536;
277+
long limit = blockSize / 2;
278+
QuotaProtos.Throttle throttle =
279+
QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder()
280+
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
281+
QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
282+
DefaultOperationQuota quota =
283+
new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);
284+
285+
// reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
286+
quota.checkBatchQuota(0, 1);
287+
288+
// the next request should be blocked
289+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));
290+
291+
envEdge.incValue(1000);
292+
// even after the TimeUnit, the limit should not be refilled because we oversubscribed
293+
assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1));
294+
}
128295
}

0 commit comments

Comments
 (0)