Skip to content

Commit f626549

Browse files
author
Hernan Gelaf-Romer
committed
Create scanner after quota check
1 parent 0a19b08 commit f626549

File tree

3 files changed

+333
-27
lines changed

3 files changed

+333
-27
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,18 @@ public void run() {
418418
}
419419
}
420420

421+
static class RegionScannerContext {
422+
final String scannerName;
423+
final RegionScannerHolder holder;
424+
final OperationQuota quota;
425+
426+
RegionScannerContext(String scannerName, RegionScannerHolder holder, OperationQuota quota) {
427+
this.scannerName = scannerName;
428+
this.holder = holder;
429+
this.quota = quota;
430+
}
431+
}
432+
421433
/**
422434
* Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
423435
*/
@@ -1267,12 +1279,12 @@ public int getScannersCount() {
12671279

12681280
/** Returns The outstanding RegionScanner for <code>scannerId</code> or null if none found. */
12691281
RegionScanner getScanner(long scannerId) {
1270-
RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1282+
RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
12711283
return rsh == null ? null : rsh.s;
12721284
}
12731285

12741286
/** Returns The associated RegionScannerHolder for <code>scannerId</code> or null. */
1275-
private RegionScannerHolder getRegionScannerHolder(long scannerId) {
1287+
private RegionScannerHolder checkQuotaAndGetRegionScannerContext(long scannerId) {
12761288
return scanners.get(toScannerName(scannerId));
12771289
}
12781290

@@ -1313,7 +1325,7 @@ public String getScanDetailsWithRequest(ScanRequest request) {
13131325
* Get the vtime associated with the scanner. Currently the vtime is the number of "next" calls.
13141326
*/
13151327
long getScannerVirtualTime(long scannerId) {
1316-
RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
1328+
RegionScannerHolder rsh = checkQuotaAndGetRegionScannerContext(scannerId);
13171329
return rsh == null ? 0L : rsh.getNextCallSeq();
13181330
}
13191331

@@ -3134,9 +3146,8 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
31343146
* @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
31353147
* value.
31363148
*/
3137-
private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
3149+
private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request, HRegion region,
31383150
ScanResponse.Builder builder) throws IOException {
3139-
HRegion region = getRegion(request.getRegion());
31403151
rejectIfInStandByState(region);
31413152
ClientProtos.Scan protoScan = request.getScan();
31423153
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
@@ -3553,22 +3564,10 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
35533564
}
35543565
requestCount.increment();
35553566
rpcScanRequestCount.increment();
3556-
RegionScannerHolder rsh;
3567+
RegionScannerContext rsx;
35573568
ScanResponse.Builder builder = ScanResponse.newBuilder();
3558-
String scannerName;
35593569
try {
3560-
if (request.hasScannerId()) {
3561-
// The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3562-
// for more details.
3563-
long scannerId = request.getScannerId();
3564-
builder.setScannerId(scannerId);
3565-
scannerName = toScannerName(scannerId);
3566-
rsh = getRegionScanner(request);
3567-
} else {
3568-
Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
3569-
scannerName = scannerNameAndRSH.getFirst();
3570-
rsh = scannerNameAndRSH.getSecond();
3571-
}
3570+
rsx = checkQuotaAndGetRegionScannerContext(request, builder);
35723571
} catch (IOException e) {
35733572
if (e == SCANNER_ALREADY_CLOSED) {
35743573
// Now we will close scanner automatically if there are no more results for this region but
@@ -3577,6 +3576,9 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
35773576
}
35783577
throw new ServiceException(e);
35793578
}
3579+
String scannerName = rsx.scannerName;
3580+
RegionScannerHolder rsh = rsx.holder;
3581+
OperationQuota quota = rsx.quota;
35803582
if (rsh.fullRegionScan) {
35813583
rpcFullScanRequestCount.increment();
35823584
}
@@ -3599,14 +3601,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
35993601
}
36003602
return builder.build();
36013603
}
3602-
OperationQuota quota;
3603-
try {
3604-
quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize,
3605-
rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
3606-
} catch (IOException e) {
3607-
addScannerLeaseBack(lease);
3608-
throw new ServiceException(e);
3609-
}
36103604
try {
36113605
checkScanNextCallSeq(request, rsh);
36123606
} catch (OutOfOrderScannerNextException e) {
@@ -3990,4 +3984,26 @@ public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
39903984
});
39913985
return responseBuilder.addAllCachedFiles(fullyCachedFiles).build();
39923986
}
3987+
3988+
RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request,
3989+
ScanResponse.Builder builder) throws IOException {
3990+
if (request.hasScannerId()) {
3991+
// The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
3992+
// for more details.
3993+
long scannerId = request.getScannerId();
3994+
builder.setScannerId(scannerId);
3995+
String scannerName = toScannerName(scannerId);
3996+
RegionScannerHolder rsh = getRegionScanner(request);
3997+
OperationQuota quota =
3998+
getRpcQuotaManager().checkScanQuota(rsh.r, request, maxScannerResultSize,
3999+
rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference());
4000+
return new RegionScannerContext(scannerName, rsh, quota);
4001+
}
4002+
4003+
HRegion region = getRegion(request.getRegion());
4004+
OperationQuota quota =
4005+
getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, 0L, 0L);
4006+
Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder);
4007+
return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota);
4008+
}
39934009
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.quotas;
19+
20+
import java.util.List;
21+
import org.apache.hadoop.hbase.Cell;
22+
import org.apache.hadoop.hbase.client.Mutation;
23+
import org.apache.hadoop.hbase.client.Result;
24+
25+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
26+
27+
public class TestNoopOperationQuota implements OperationQuota {
28+
public static final TestNoopOperationQuota INSTANCE = new TestNoopOperationQuota();
29+
30+
@Override
31+
public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
32+
}
33+
34+
@Override
35+
public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
36+
long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
37+
38+
}
39+
40+
@Override
41+
public void close() {
42+
43+
}
44+
45+
@Override
46+
public void addGetResult(Result result) {
47+
48+
}
49+
50+
@Override
51+
public void addScanResult(List<Result> results) {
52+
53+
}
54+
55+
@Override
56+
public void addScanResultCells(List<Cell> cells) {
57+
58+
}
59+
60+
@Override
61+
public void addMutation(Mutation mutation) {
62+
63+
}
64+
65+
@Override
66+
public long getReadAvailable() {
67+
return 0L;
68+
}
69+
70+
@Override
71+
public long getReadConsumed() {
72+
return 0;
73+
}
74+
}

0 commit comments

Comments
 (0)