Skip to content

Commit c6800d6

Browse files
HBASE-28346: Expose checkQuota to Coprocessor Endpoints (apache#6066)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Co-authored-by: Charles Connell <cconnell@hubspot.com>
1 parent 563029a commit c6800d6

File tree

9 files changed

+342
-38
lines changed

9 files changed

+342
-38
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@
2626
import org.apache.hadoop.hbase.ServerName;
2727
import org.apache.hadoop.hbase.client.Connection;
2828
import org.apache.hadoop.hbase.client.RegionInfo;
29+
import org.apache.hadoop.hbase.client.Scan;
2930
import org.apache.hadoop.hbase.metrics.MetricRegistry;
31+
import org.apache.hadoop.hbase.quotas.OperationQuota;
32+
import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
33+
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
3034
import org.apache.hadoop.hbase.regionserver.OnlineRegions;
3135
import org.apache.hadoop.hbase.regionserver.Region;
3236
import org.apache.hadoop.hbase.security.User;
@@ -127,4 +131,52 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment<Reg
127131
* @return the RawCellBuilder
128132
*/
129133
RawCellBuilder getCellBuilder();
134+
135+
/**
136+
* Returns an RpcQuotaManager that can be used to apply quota checks against the workloads
137+
* generated by the coprocessor.
138+
* @return the RpcQuotaManager
139+
*/
140+
RpcQuotaManager getRpcQuotaManager();
141+
142+
/**
143+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
144+
* available quota and to report the data/usage of the operation.
145+
* @param scan the scan to be estimated against the quota
146+
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
147+
* scanner
148+
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
149+
* calls
150+
* @return the OperationQuota
151+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
152+
*/
153+
OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
154+
long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException;
155+
156+
/**
157+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
158+
* available quota and to report the data/usage of the operation. This method does not support
159+
* scans because estimating a scan's workload is more complicated than estimating the workload of
160+
* a get/put.
161+
* @param region the region where the operation will be performed
162+
* @param type the operation type
163+
* @return the OperationQuota
164+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
165+
*/
166+
OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type)
167+
throws IOException, RpcThrottlingException;
168+
169+
/**
170+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
171+
* available quota and to report the data/usage of the operation. This method does not support
172+
* scans because estimating a scan's workload is more complicated than estimating the workload of
173+
* a get/put.
174+
* @param region the region where the operation will be performed
175+
* @param numWrites number of writes to count against quota
176+
* @param numReads number of reads to count against quota
177+
* @return the OperationQuota
178+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
179+
*/
180+
OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
181+
throws IOException, RpcThrottlingException;
130182
}

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.List;
2222
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.hbase.Cell;
2324
import org.apache.hadoop.hbase.client.Mutation;
2425
import org.apache.hadoop.hbase.client.Result;
2526
import org.apache.hadoop.hbase.ipc.RpcCall;
@@ -178,6 +179,11 @@ public void addScanResult(final List<Result> results) {
178179
operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
179180
}
180181

182+
@Override
183+
public void addScanResultCells(final List<Cell> cells) {
184+
operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells);
185+
}
186+
181187
@Override
182188
public void addMutation(final Mutation mutation) {
183189
operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.quotas;
1919

2020
import java.util.List;
21+
import org.apache.hadoop.hbase.Cell;
2122
import org.apache.hadoop.hbase.client.Mutation;
2223
import org.apache.hadoop.hbase.client.Result;
2324
import org.apache.yetus.audience.InterfaceAudience;
@@ -81,4 +82,9 @@ public long getReadAvailable() {
8182
public long getReadConsumed() {
8283
return 0L;
8384
}
85+
86+
@Override
87+
public void addScanResultCells(List<Cell> cells) {
88+
// no-op
89+
}
8490
}

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.quotas;
1919

2020
import java.util.List;
21+
import org.apache.hadoop.hbase.Cell;
2122
import org.apache.hadoop.hbase.client.Mutation;
2223
import org.apache.hadoop.hbase.client.Result;
2324
import org.apache.yetus.audience.InterfaceAudience;
@@ -88,6 +89,12 @@ void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultS
8889
*/
8990
void addScanResult(List<Result> results);
9091

92+
/**
93+
* Add a scan result in the form of cells. This will be used to calculate the exact quota and have
94+
* a better long-read average size for the next time.
95+
*/
96+
void addScanResultCells(List<Cell> cells);
97+
9198
/**
9299
* Add a mutation result. This will be used to calculate the exact quota and have a better
93100
* mutation average size for the next time.

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,14 @@ public static long calculateResultSize(final List<Result> results) {
590590
return size;
591591
}
592592

593+
public static long calculateCellsSize(final List<Cell> cells) {
594+
long size = 0;
595+
for (Cell cell : cells) {
596+
size += cell.getSerializedSize();
597+
}
598+
return size;
599+
}
600+
593601
/**
594602
* Method to enable a table, if not already enabled. This method suppresses
595603
* {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
*/
4444
@InterfaceAudience.Private
4545
@InterfaceStability.Evolving
46-
public class RegionServerRpcQuotaManager {
46+
public class RegionServerRpcQuotaManager implements RpcQuotaManager {
4747
private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class);
4848

4949
private final RegionServerServices rsServices;
@@ -154,21 +154,7 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t
154154
return NoopOperationQuota.get();
155155
}
156156

157-
/**
158-
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
159-
* available quota and to report the data/usage of the operation. This method is specific to scans
160-
* because estimating a scan's workload is more complicated than estimating the workload of a
161-
* get/put.
162-
* @param region the region where the operation will be performed
163-
* @param scanRequest the scan to be estimated against the quota
164-
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
165-
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
166-
* scanner
167-
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
168-
* calls
169-
* @return the OperationQuota
170-
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
171-
*/
157+
@Override
172158
public OperationQuota checkScanQuota(final Region region,
173159
final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
174160
long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
@@ -195,16 +181,7 @@ public OperationQuota checkScanQuota(final Region region,
195181
return quota;
196182
}
197183

198-
/**
199-
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
200-
* available quota and to report the data/usage of the operation. This method does not support
201-
* scans because estimating a scan's workload is more complicated than estimating the workload of
202-
* a get/put.
203-
* @param region the region where the operation will be performed
204-
* @param type the operation type
205-
* @return the OperationQuota
206-
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
207-
*/
184+
@Override
208185
public OperationQuota checkBatchQuota(final Region region,
209186
final OperationQuota.OperationType type) throws IOException, RpcThrottlingException {
210187
switch (type) {
@@ -218,17 +195,7 @@ public OperationQuota checkBatchQuota(final Region region,
218195
throw new RuntimeException("Invalid operation type: " + type);
219196
}
220197

221-
/**
222-
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
223-
* available quota and to report the data/usage of the operation. This method does not support
224-
* scans because estimating a scan's workload is more complicated than estimating the workload of
225-
* a get/put.
226-
* @param region the region where the operation will be performed
227-
* @param actions the "multi" actions to perform
228-
* @param hasCondition whether the RegionAction has a condition
229-
* @return the OperationQuota
230-
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
231-
*/
198+
@Override
232199
public OperationQuota checkBatchQuota(final Region region,
233200
final List<ClientProtos.Action> actions, boolean hasCondition)
234201
throws IOException, RpcThrottlingException {
@@ -258,7 +225,8 @@ public OperationQuota checkBatchQuota(final Region region,
258225
* @return the OperationQuota
259226
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
260227
*/
261-
private OperationQuota checkBatchQuota(final Region region, final int numWrites,
228+
@Override
229+
public OperationQuota checkBatchQuota(final Region region, final int numWrites,
262230
final int numReads) throws IOException, RpcThrottlingException {
263231
Optional<User> user = RpcServer.getRequestUser();
264232
UserGroupInformation ugi;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.quotas;
19+
20+
import java.io.IOException;
21+
import java.util.List;
22+
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
23+
import org.apache.hadoop.hbase.regionserver.Region;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
import org.apache.yetus.audience.InterfaceStability;
26+
27+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
28+
29+
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
30+
@InterfaceStability.Evolving
31+
public interface RpcQuotaManager {
32+
33+
/**
34+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
35+
* available quota and to report the data/usage of the operation. This method is specific to scans
36+
* because estimating a scan's workload is more complicated than estimating the workload of a
37+
* get/put.
38+
* @param region the region where the operation will be performed
39+
* @param scanRequest the scan to be estimated against the quota
40+
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
41+
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
42+
* scanner
43+
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
44+
* calls
45+
* @return the OperationQuota
46+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
47+
*/
48+
OperationQuota checkScanQuota(final Region region, final ClientProtos.ScanRequest scanRequest,
49+
long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
50+
throws IOException, RpcThrottlingException;
51+
52+
/**
53+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
54+
* available quota and to report the data/usage of the operation. This method does not support
55+
* scans because estimating a scan's workload is more complicated than estimating the workload of
56+
* a get/put.
57+
* @param region the region where the operation will be performed
58+
* @param type the operation type
59+
* @return the OperationQuota
60+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
61+
*/
62+
OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type)
63+
throws IOException, RpcThrottlingException;
64+
65+
/**
66+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
67+
* available quota and to report the data/usage of the operation. This method does not support
68+
* scans because estimating a scan's workload is more complicated than estimating the workload of
69+
* a get/put.
70+
* @param region the region where the operation will be performed
71+
* @param actions the "multi" actions to perform
72+
* @param hasCondition whether the RegionAction has a condition
73+
* @return the OperationQuota
74+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
75+
*/
76+
OperationQuota checkBatchQuota(final Region region, final List<ClientProtos.Action> actions,
77+
boolean hasCondition) throws IOException, RpcThrottlingException;
78+
79+
/**
80+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
81+
* available quota and to report the data/usage of the operation. This method does not support
82+
* scans because estimating a scan's workload is more complicated than estimating the workload of
83+
* a get/put.
84+
* @param region the region where the operation will be performed
85+
* @param numWrites number of writes to count against quota
86+
* @param numReads number of reads to count against quota
87+
* @return the OperationQuota
88+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
89+
*/
90+
OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
91+
throws IOException, RpcThrottlingException;
92+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@
6969
import org.apache.hadoop.hbase.io.Reference;
7070
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
7171
import org.apache.hadoop.hbase.metrics.MetricRegistry;
72+
import org.apache.hadoop.hbase.quotas.OperationQuota;
73+
import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
74+
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
7275
import org.apache.hadoop.hbase.regionserver.Region.Operation;
7376
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
7477
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -85,6 +88,9 @@
8588
import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
8689
import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
8790

91+
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
92+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
93+
8894
/**
8995
* Implements the coprocessor environment and runtime support for coprocessors loaded within a
9096
* {@link Region}.
@@ -118,6 +124,7 @@ private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor
118124
ConcurrentMap<String, Object> sharedData;
119125
private final MetricRegistry metricRegistry;
120126
private final RegionServerServices services;
127+
private final RpcQuotaManager rpcQuotaManager;
121128

122129
/**
123130
* Constructor
@@ -133,6 +140,13 @@ public RegionEnvironment(final RegionCoprocessor impl, final int priority, final
133140
this.services = services;
134141
this.metricRegistry =
135142
MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
143+
// Some unit tests reach this line with services == null, and are okay with rpcQuotaManager
144+
// being null. Let these unit tests succeed. This should not happen in real usage.
145+
if (services != null) {
146+
this.rpcQuotaManager = services.getRegionServerRpcQuotaManager();
147+
} else {
148+
this.rpcQuotaManager = null;
149+
}
136150
}
137151

138152
/** Returns the region */
@@ -188,6 +202,35 @@ public RawCellBuilder getCellBuilder() {
188202
// We always do a DEEP_COPY only
189203
return RawCellBuilderFactory.create();
190204
}
205+
206+
@Override
207+
public RpcQuotaManager getRpcQuotaManager() {
208+
return rpcQuotaManager;
209+
}
210+
211+
@Override
212+
public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
213+
long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException {
214+
ClientProtos.ScanRequest scanRequest = RequestConverter
215+
.buildScanRequest(region.getRegionInfo().getRegionName(), scan, scan.getCaching(), false);
216+
long maxScannerResultSize =
217+
services.getConfiguration().getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
218+
HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
219+
return rpcQuotaManager.checkScanQuota(region, scanRequest, maxScannerResultSize,
220+
maxBlockBytesScanned, prevBlockBytesScannedDifference);
221+
}
222+
223+
@Override
224+
public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type)
225+
throws IOException, RpcThrottlingException {
226+
return rpcQuotaManager.checkBatchQuota(region, type);
227+
}
228+
229+
@Override
230+
public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
231+
throws IOException, RpcThrottlingException {
232+
return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads);
233+
}
191234
}
192235

193236
/**

0 commit comments

Comments
 (0)