Skip to content

Commit d167196

Browse files
author
Ray Mattingly
committed
Use BBS as default. Makes tests a bit more defensive against flakiness
1 parent c36ab49 commit d167196

File tree

3 files changed

+60
-40
lines changed

3 files changed

+60
-40
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,14 @@ public class DefaultOperationQuota implements OperationQuota {
5151
protected long readDiff = 0;
5252
protected long writeCapacityUnitDiff = 0;
5353
protected long readCapacityUnitDiff = 0;
54-
private boolean useBlockBytesScanned;
54+
private boolean useResultSizeBytes;
5555
private long blockSizeBytes;
5656

5757
public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
5858
final QuotaLimiter... limiters) {
5959
this(conf, Arrays.asList(limiters));
60-
this.useBlockBytesScanned =
61-
conf.getBoolean(OperationQuota.USE_BLOCK_BYTES_SCANNED_KEY, USE_BLOCK_BYTES_SCANNED_DEFAULT);
60+
this.useResultSizeBytes =
61+
conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT);
6262
this.blockSizeBytes = blockSizeBytes;
6363
}
6464

@@ -105,12 +105,12 @@ public void close() {
105105

106106
long resultSize =
107107
operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()];
108-
if (useBlockBytesScanned) {
108+
if (useResultSizeBytes) {
109+
readDiff = resultSize - readConsumed;
110+
} else {
109111
long blockBytesScanned =
110112
RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L);
111113
readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed;
112-
} else {
113-
readDiff = resultSize - readConsumed;
114114
}
115115

116116
writeCapacityUnitDiff =
@@ -158,13 +158,13 @@ public void addMutation(final Mutation mutation) {
158158
protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) {
159159
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
160160

161-
if (useBlockBytesScanned) {
161+
if (useResultSizeBytes) {
162+
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
163+
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
164+
} else {
162165
// assume 1 block required for reads. this is probably a low estimate, which is okay
163166
readConsumed = numReads > 0 ? blockSizeBytes : 0;
164167
readConsumed += numScans > 0 ? blockSizeBytes : 0;
165-
} else {
166-
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
167-
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
168168
}
169169

170170
writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,15 @@ public enum OperationType {
3535
SCAN
3636
}
3737

38-
String USE_BLOCK_BYTES_SCANNED_KEY = "hbase.quota.use.block.bytes.scanned";
39-
boolean USE_BLOCK_BYTES_SCANNED_DEFAULT = false;
38+
/**
39+
* If false, the default, then IO based throttles will consume read availability based on the
40+
* block bytes scanned by the given request. If true then IO based throttles will use result size
41+
* rather than block bytes scanned. Using block bytes scanned should be preferable to using result
42+
* size, because otherwise access patterns like heavily filtered scans may be able to produce a
43+
* significant and effectively un-throttled workload.
44+
*/
45+
String USE_RESULT_SIZE_BYTES = "hbase.quota.use.result.size.bytes";
46+
boolean USE_RESULT_SIZE_BYTES_DEFAULT = false;
4047

4148
/**
4249
* Checks if it is possible to execute the specified operation. The quota will be estimated based

hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@
2323
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans;
2424
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh;
2525
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota;
26-
import static org.junit.Assert.assertEquals;
27-
import static org.junit.Assert.assertTrue;
2826

27+
import java.util.concurrent.Callable;
2928
import java.util.concurrent.TimeUnit;
3029
import org.apache.hadoop.hbase.HBaseClassTestRule;
3130
import org.apache.hadoop.hbase.HBaseTestingUtil;
@@ -71,7 +70,6 @@ public static void setUpBeforeClass() throws Exception {
7170
// quotas enabled, using block bytes scanned
7271
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
7372
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME);
74-
TEST_UTIL.getConfiguration().setBoolean(OperationQuota.USE_BLOCK_BYTES_SCANNED_KEY, true);
7573

7674
// don't cache blocks to make IO predictable
7775
TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
@@ -106,22 +104,22 @@ public void testBBSGet() throws Exception {
106104
TEST_UTIL.flush(TABLE_NAME);
107105

108106
// Add ~10 block/min limit
109-
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
107+
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE,
110108
Math.round(10.1 * blockSize), TimeUnit.MINUTES));
111109
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
112110

113111
// should execute at max 10 requests
114-
assertEquals(10, doGets(20, FAMILY, QUALIFIER, table));
112+
testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1);
115113

116114
// wait a minute and you should get another 10 requests executed
117115
waitMinuteQuota();
118-
assertEquals(10, doGets(20, FAMILY, QUALIFIER, table));
116+
testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1);
119117

120118
// Remove all the limits
121119
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
122120
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
123-
assertEquals(100, doGets(100, FAMILY, QUALIFIER, table));
124-
assertEquals(100, doGets(100, FAMILY, QUALIFIER, table));
121+
testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0);
122+
testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0);
125123
}
126124

127125
@Test
@@ -142,30 +140,27 @@ public void testBBSScan() throws Exception {
142140
waitMinuteQuota();
143141

144142
// should execute 1 request
145-
assertEquals(1, doScans(5, table));
143+
testTraffic(() -> doScans(5, table), 1, 0);
146144

147145
// Remove all the limits
148146
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
149147
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
150-
assertEquals(100, doScans(100, table));
151-
assertEquals(100, doScans(100, table));
148+
testTraffic(() -> doScans(100, table), 100, 0);
149+
testTraffic(() -> doScans(100, table), 100, 0);
152150

153151
// Add ~3 block/min limit. This should support >1 scans
154152
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
155153
Math.round(3.1 * blockSize), TimeUnit.MINUTES));
156154
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
157155

158156
// should execute some requests, but not all
159-
long successfulScans = doScans(100, table);
160-
LOG.info("successfulScans = " + successfulScans);
161-
assertTrue(successfulScans < 100);
162-
assertTrue(successfulScans > 0);
157+
testTraffic(() -> doScans(100, table), 100, 90);
163158

164159
// Remove all the limits
165160
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
166161
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
167-
assertEquals(100, doScans(100, table));
168-
assertEquals(100, doScans(100, table));
162+
testTraffic(() -> doScans(100, table), 100, 0);
163+
testTraffic(() -> doScans(100, table), 100, 0);
169164
}
170165

171166
@Test
@@ -187,34 +182,52 @@ public void testBBSMultiGet() throws Exception {
187182
waitMinuteQuota();
188183

189184
// should execute 1 request
190-
assertEquals(1, doMultiGets(10, 10, rowCount, FAMILY, QUALIFIER, table));
185+
testTraffic(() -> doMultiGets(10, 10, rowCount, FAMILY, QUALIFIER, table), 1, 1);
191186

192187
// Remove all the limits
193188
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
194189
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
195-
assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table));
196-
assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table));
190+
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
191+
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
197192

198193
// Add ~100 block/min limit
199194
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
200195
Math.round(100.1 * blockSize), TimeUnit.MINUTES));
201196
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
202197

203198
// should execute approximately 10 batches of 10 requests
204-
long successfulMultiGets = doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table);
205-
assertTrue(successfulMultiGets >= 9);
206-
assertTrue(successfulMultiGets <= 11);
199+
testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1);
207200

208201
// wait a minute and you should get another ~10 batches of 10 requests
209202
waitMinuteQuota();
210-
successfulMultiGets = doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table);
211-
assertTrue(successfulMultiGets >= 9);
212-
assertTrue(successfulMultiGets <= 11);
203+
testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 10, 1);
213204

214205
// Remove all the limits
215206
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
216207
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
217-
assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table));
218-
assertEquals(100, doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table));
208+
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
209+
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
210+
}
211+
212+
private void testTraffic(Callable<Long> trafficCallable, long expectedSuccess, long marginOfError)
213+
throws Exception {
214+
TEST_UTIL.waitFor(90_000, () -> {
215+
long actualSuccess;
216+
try {
217+
actualSuccess = trafficCallable.call();
218+
} catch (Exception e) {
219+
throw new RuntimeException(e);
220+
}
221+
LOG.info("Traffic test yielded {} successful requests. Expected {} +/- {}", actualSuccess,
222+
expectedSuccess, marginOfError);
223+
boolean success = (actualSuccess >= expectedSuccess - marginOfError)
224+
&& (actualSuccess <= expectedSuccess + marginOfError);
225+
if (!success) {
226+
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
227+
waitMinuteQuota();
228+
Thread.sleep(15_000L);
229+
}
230+
return success;
231+
});
219232
}
220233
}

0 commit comments

Comments
 (0)