27
27
import org .apache .yetus .audience .InterfaceAudience ;
28
28
import org .apache .yetus .audience .InterfaceStability ;
29
29
30
+ import org .apache .hadoop .hbase .shaded .protobuf .generated .ClientProtos ;
31
+
30
32
@ InterfaceAudience .Private
31
33
@ InterfaceStability .Evolving
32
34
public class DefaultOperationQuota implements OperationQuota {
33
35
36
+ // a single scan estimate can consume no more than this proportion of the limiter's limit
37
+ // this prevents a long-running scan from being estimated at, say, 100MB of IO against
38
+ // a <100MB/IO throttle (because this would never succeed)
39
+ private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9 ;
40
+
34
41
protected final List <QuotaLimiter > limiters ;
35
42
private final long writeCapacityUnit ;
36
43
private final long readCapacityUnit ;
@@ -53,13 +60,17 @@ public class DefaultOperationQuota implements OperationQuota {
53
60
protected long readCapacityUnitDiff = 0 ;
54
61
private boolean useResultSizeBytes ;
55
62
private long blockSizeBytes ;
63
+ private long maxScanEstimate ;
56
64
57
65
public DefaultOperationQuota (final Configuration conf , final int blockSizeBytes ,
58
66
final QuotaLimiter ... limiters ) {
59
67
this (conf , Arrays .asList (limiters ));
60
68
this .useResultSizeBytes =
61
69
conf .getBoolean (OperationQuota .USE_RESULT_SIZE_BYTES , USE_RESULT_SIZE_BYTES_DEFAULT );
62
70
this .blockSizeBytes = blockSizeBytes ;
71
+ long readSizeLimit =
72
+ Arrays .stream (limiters ).mapToLong (QuotaLimiter ::getReadLimit ).min ().orElse (Long .MAX_VALUE );
73
+ maxScanEstimate = Math .round (MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit );
63
74
}
64
75
65
76
/**
@@ -80,21 +91,34 @@ public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter>
80
91
}
81
92
82
93
@ Override
83
- public void checkQuota (int numWrites , int numReads , int numScans ) throws RpcThrottlingException {
84
- updateEstimateConsumeQuota (numWrites , numReads , numScans );
94
+ public void checkBatchQuota (int numWrites , int numReads ) throws RpcThrottlingException {
95
+ updateEstimateConsumeBatchQuota (numWrites , numReads );
96
+ checkQuota (numWrites , numReads );
97
+ }
98
+
99
+ @ Override
100
+ public void checkScanQuota (ClientProtos .ScanRequest scanRequest , long maxScannerResultSize ,
101
+ long maxBlockBytesScanned , long prevBlockBytesScannedDifference ) throws RpcThrottlingException {
102
+ updateEstimateConsumeScanQuota (scanRequest , maxScannerResultSize , maxBlockBytesScanned ,
103
+ prevBlockBytesScannedDifference );
104
+ checkQuota (0 , 1 );
105
+ }
85
106
107
+ private void checkQuota (long numWrites , long numReads ) throws RpcThrottlingException {
86
108
readAvailable = Long .MAX_VALUE ;
87
109
for (final QuotaLimiter limiter : limiters ) {
88
- if (limiter .isBypass ()) continue ;
110
+ if (limiter .isBypass ()) {
111
+ continue ;
112
+ }
89
113
90
- limiter .checkQuota (numWrites , writeConsumed , numReads + numScans , readConsumed ,
114
+ limiter .checkQuota (numWrites , writeConsumed , numReads , readConsumed ,
91
115
writeCapacityUnitConsumed , readCapacityUnitConsumed );
92
116
readAvailable = Math .min (readAvailable , limiter .getReadAvailable ());
93
117
}
94
118
95
119
for (final QuotaLimiter limiter : limiters ) {
96
- limiter .grabQuota (numWrites , writeConsumed , numReads + numScans , readConsumed ,
97
- writeCapacityUnitConsumed , readCapacityUnitConsumed );
120
+ limiter .grabQuota (numWrites , writeConsumed , numReads , readConsumed , writeCapacityUnitConsumed ,
121
+ readCapacityUnitConsumed );
98
122
}
99
123
}
100
124
@@ -158,24 +182,69 @@ public void addMutation(final Mutation mutation) {
158
182
* Update estimate quota(read/write size/capacityUnits) which will be consumed
159
183
* @param numWrites the number of write requests
160
184
* @param numReads the number of read requests
161
- * @param numScans the number of scan requests
162
185
*/
163
- protected void updateEstimateConsumeQuota (int numWrites , int numReads , int numScans ) {
186
+ protected void updateEstimateConsumeBatchQuota (int numWrites , int numReads ) {
164
187
writeConsumed = estimateConsume (OperationType .MUTATE , numWrites , 100 );
165
188
166
189
if (useResultSizeBytes ) {
167
190
readConsumed = estimateConsume (OperationType .GET , numReads , 100 );
168
- readConsumed += estimateConsume (OperationType .SCAN , numScans , 1000 );
169
191
} else {
170
192
// assume 1 block required for reads. this is probably a low estimate, which is okay
171
193
readConsumed = numReads > 0 ? blockSizeBytes : 0 ;
172
- readConsumed += numScans > 0 ? blockSizeBytes : 0 ;
173
194
}
174
195
175
196
writeCapacityUnitConsumed = calculateWriteCapacityUnit (writeConsumed );
176
197
readCapacityUnitConsumed = calculateReadCapacityUnit (readConsumed );
177
198
}
178
199
200
+ /**
201
+ * Update estimate quota(read/write size/capacityUnits) which will be consumed
202
+ * @param scanRequest the scan to be executed
203
+ * @param maxScannerResultSize the maximum bytes to be returned by the scanner
204
+ * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
205
+ * scanner
206
+ * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
207
+ * calls
208
+ */
209
+ protected void updateEstimateConsumeScanQuota (ClientProtos .ScanRequest scanRequest ,
210
+ long maxScannerResultSize , long maxBlockBytesScanned , long prevBlockBytesScannedDifference ) {
211
+ if (useResultSizeBytes ) {
212
+ readConsumed = estimateConsume (OperationType .SCAN , 1 , 1000 );
213
+ } else {
214
+ long estimate = getScanReadConsumeEstimate (blockSizeBytes , scanRequest .getNextCallSeq (),
215
+ maxScannerResultSize , maxBlockBytesScanned , prevBlockBytesScannedDifference );
216
+ readConsumed = Math .min (maxScanEstimate , estimate );
217
+ }
218
+
219
+ readCapacityUnitConsumed = calculateReadCapacityUnit (readConsumed );
220
+ }
221
+
222
+ protected static long getScanReadConsumeEstimate (long blockSizeBytes , long nextCallSeq ,
223
+ long maxScannerResultSize , long maxBlockBytesScanned , long prevBlockBytesScannedDifference ) {
224
+ /*
225
+ * Estimating scan workload is more complicated, and if we severely underestimate workloads then
226
+ * throttled clients will exhaust retries too quickly, and could saturate the RPC layer
227
+ */
228
+ if (nextCallSeq == 0 ) {
229
+ // start scanners with an optimistic 1 block IO estimate
230
+ // it is better to underestimate a large scan in the beginning
231
+ // than to overestimate, and block, a small scan
232
+ return blockSizeBytes ;
233
+ }
234
+
235
+ boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes ;
236
+ if (isWorkloadGrowing ) {
237
+ // if nextCallSeq > 0 and the workload is growing then our estimate
238
+ // should consider that the workload may continue to increase
239
+ return Math .min (maxScannerResultSize , nextCallSeq * maxBlockBytesScanned );
240
+ } else {
241
+ // if nextCallSeq > 0 and the workload is shrinking or flat
242
+ // then our workload has likely plateaued. We can just rely on the existing
243
+ // maxBlockBytesScanned as our estimate in this case.
244
+ return maxBlockBytesScanned ;
245
+ }
246
+ }
247
+
179
248
private long estimateConsume (final OperationType type , int numReqs , long avgSize ) {
180
249
if (numReqs > 0 ) {
181
250
return avgSize * numReqs ;
0 commit comments