Skip to content

Commit 9f5b1f1

Browse files
charlesconnellndimiduk
authored andcommitted
HBASE-28346: Expose checkQuota to Coprocessor Endpoints (#6066)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
1 parent 67fb256 commit 9f5b1f1

File tree

9 files changed

+342
-38
lines changed

9 files changed

+342
-38
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@
2626
import org.apache.hadoop.hbase.ServerName;
2727
import org.apache.hadoop.hbase.client.Connection;
2828
import org.apache.hadoop.hbase.client.RegionInfo;
29+
import org.apache.hadoop.hbase.client.Scan;
2930
import org.apache.hadoop.hbase.metrics.MetricRegistry;
31+
import org.apache.hadoop.hbase.quotas.OperationQuota;
32+
import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
33+
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
3034
import org.apache.hadoop.hbase.regionserver.OnlineRegions;
3135
import org.apache.hadoop.hbase.regionserver.Region;
3236
import org.apache.yetus.audience.InterfaceAudience;
@@ -120,4 +124,52 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment<Reg
120124
* @return the RawCellBuilder
121125
*/
122126
RawCellBuilder getCellBuilder();
127+
128+
/**
129+
* Returns an RpcQuotaManager that can be used to apply quota checks against the workloads
130+
* generated by the coprocessor.
131+
* @return the RpcQuotaManager
132+
*/
133+
RpcQuotaManager getRpcQuotaManager();
134+
135+
/**
136+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
137+
* available quota and to report the data/usage of the operation.
138+
* @param scan the scan to be estimated against the quota
139+
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
140+
* scanner
141+
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
142+
* calls
143+
* @return the OperationQuota
144+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
145+
*/
146+
OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
147+
long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException;
148+
149+
/**
150+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
151+
* available quota and to report the data/usage of the operation. This method does not support
152+
* scans because estimating a scan's workload is more complicated than estimating the workload of
153+
* a get/put.
154+
* @param region the region where the operation will be performed
155+
* @param type the operation type
156+
* @return the OperationQuota
157+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
158+
*/
159+
OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type)
160+
throws IOException, RpcThrottlingException;
161+
162+
/**
163+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
164+
* available quota and to report the data/usage of the operation. This method does not support
165+
* scans because estimating a scan's workload is more complicated than estimating the workload of
166+
* a get/put.
167+
* @param region the region where the operation will be performed
168+
* @param numWrites number of writes to count against quota
169+
* @param numReads number of reads to count against quota
170+
* @return the OperationQuota
171+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
172+
*/
173+
OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
174+
throws IOException, RpcThrottlingException;
123175
}

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.List;
2222
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.hbase.Cell;
2324
import org.apache.hadoop.hbase.client.Mutation;
2425
import org.apache.hadoop.hbase.client.Result;
2526
import org.apache.hadoop.hbase.ipc.RpcCall;
@@ -181,6 +182,11 @@ public void addScanResult(final List<Result> results) {
181182
operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
182183
}
183184

185+
@Override
186+
public void addScanResultCells(final List<Cell> cells) {
187+
operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells);
188+
}
189+
184190
@Override
185191
public void addMutation(final Mutation mutation) {
186192
operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.quotas;
1919

2020
import java.util.List;
21+
import org.apache.hadoop.hbase.Cell;
2122
import org.apache.hadoop.hbase.client.Mutation;
2223
import org.apache.hadoop.hbase.client.Result;
2324
import org.apache.yetus.audience.InterfaceAudience;
@@ -81,4 +82,9 @@ public long getReadAvailable() {
8182
public long getReadConsumed() {
8283
return 0L;
8384
}
85+
86+
@Override
87+
public void addScanResultCells(List<Cell> cells) {
88+
// no-op
89+
}
8490
}

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.quotas;
1919

2020
import java.util.List;
21+
import org.apache.hadoop.hbase.Cell;
2122
import org.apache.hadoop.hbase.client.Mutation;
2223
import org.apache.hadoop.hbase.client.Result;
2324
import org.apache.yetus.audience.InterfaceAudience;
@@ -88,6 +89,12 @@ void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultS
8889
*/
8990
void addScanResult(List<Result> results);
9091

92+
/**
93+
* Add a scan result in the form of cells. This will be used to calculate the exact quota and have
94+
* a better long-read average size for the next time.
95+
*/
96+
void addScanResultCells(List<Cell> cells);
97+
9198
/**
9299
* Add a mutation result. This will be used to calculate the exact quota and have a better
93100
* mutation average size for the next time.

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,14 @@ public static long calculateResultSize(final List<Result> results) {
591591
return size;
592592
}
593593

594+
public static long calculateCellsSize(final List<Cell> cells) {
595+
long size = 0;
596+
for (Cell cell : cells) {
597+
size += cell.getSerializedSize();
598+
}
599+
return size;
600+
}
601+
594602
/**
595603
* Method to enable a table, if not already enabled. This method suppresses
596604
* {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
*/
4444
@InterfaceAudience.Private
4545
@InterfaceStability.Evolving
46-
public class RegionServerRpcQuotaManager {
46+
public class RegionServerRpcQuotaManager implements RpcQuotaManager {
4747
private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class);
4848

4949
private final RegionServerServices rsServices;
@@ -154,21 +154,7 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t
154154
return NoopOperationQuota.get();
155155
}
156156

157-
/**
158-
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
159-
* available quota and to report the data/usage of the operation. This method is specific to scans
160-
* because estimating a scan's workload is more complicated than estimating the workload of a
161-
* get/put.
162-
* @param region the region where the operation will be performed
163-
* @param scanRequest the scan to be estimated against the quota
164-
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
165-
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
166-
* scanner
167-
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
168-
* calls
169-
* @return the OperationQuota
170-
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
171-
*/
157+
@Override
172158
public OperationQuota checkScanQuota(final Region region,
173159
final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
174160
long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
@@ -195,16 +181,7 @@ public OperationQuota checkScanQuota(final Region region,
195181
return quota;
196182
}
197183

198-
/**
199-
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
200-
* available quota and to report the data/usage of the operation. This method does not support
201-
* scans because estimating a scan's workload is more complicated than estimating the workload of
202-
* a get/put.
203-
* @param region the region where the operation will be performed
204-
* @param type the operation type
205-
* @return the OperationQuota
206-
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
207-
*/
184+
@Override
208185
public OperationQuota checkBatchQuota(final Region region,
209186
final OperationQuota.OperationType type) throws IOException, RpcThrottlingException {
210187
switch (type) {
@@ -218,17 +195,7 @@ public OperationQuota checkBatchQuota(final Region region,
218195
throw new RuntimeException("Invalid operation type: " + type);
219196
}
220197

221-
/**
222-
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
223-
* available quota and to report the data/usage of the operation. This method does not support
224-
* scans because estimating a scan's workload is more complicated than estimating the workload of
225-
* a get/put.
226-
* @param region the region where the operation will be performed
227-
* @param actions the "multi" actions to perform
228-
* @param hasCondition whether the RegionAction has a condition
229-
* @return the OperationQuota
230-
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
231-
*/
198+
@Override
232199
public OperationQuota checkBatchQuota(final Region region,
233200
final List<ClientProtos.Action> actions, boolean hasCondition)
234201
throws IOException, RpcThrottlingException {
@@ -258,7 +225,8 @@ public OperationQuota checkBatchQuota(final Region region,
258225
* @return the OperationQuota
259226
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
260227
*/
261-
private OperationQuota checkBatchQuota(final Region region, final int numWrites,
228+
@Override
229+
public OperationQuota checkBatchQuota(final Region region, final int numWrites,
262230
final int numReads) throws IOException, RpcThrottlingException {
263231
Optional<User> user = RpcServer.getRequestUser();
264232
UserGroupInformation ugi;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.quotas;
19+
20+
import java.io.IOException;
21+
import java.util.List;
22+
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
23+
import org.apache.hadoop.hbase.regionserver.Region;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
import org.apache.yetus.audience.InterfaceStability;
26+
27+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
28+
29+
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
30+
@InterfaceStability.Evolving
31+
public interface RpcQuotaManager {
32+
33+
/**
34+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
35+
* available quota and to report the data/usage of the operation. This method is specific to scans
36+
* because estimating a scan's workload is more complicated than estimating the workload of a
37+
* get/put.
38+
* @param region the region where the operation will be performed
39+
* @param scanRequest the scan to be estimated against the quota
40+
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
41+
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
42+
* scanner
43+
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
44+
* calls
45+
* @return the OperationQuota
46+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
47+
*/
48+
OperationQuota checkScanQuota(final Region region, final ClientProtos.ScanRequest scanRequest,
49+
long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
50+
throws IOException, RpcThrottlingException;
51+
52+
/**
53+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
54+
* available quota and to report the data/usage of the operation. This method does not support
55+
* scans because estimating a scan's workload is more complicated than estimating the workload of
56+
* a get/put.
57+
* @param region the region where the operation will be performed
58+
* @param type the operation type
59+
* @return the OperationQuota
60+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
61+
*/
62+
OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type)
63+
throws IOException, RpcThrottlingException;
64+
65+
/**
66+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
67+
* available quota and to report the data/usage of the operation. This method does not support
68+
* scans because estimating a scan's workload is more complicated than estimating the workload of
69+
* a get/put.
70+
* @param region the region where the operation will be performed
71+
* @param actions the "multi" actions to perform
72+
* @param hasCondition whether the RegionAction has a condition
73+
* @return the OperationQuota
74+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
75+
*/
76+
OperationQuota checkBatchQuota(final Region region, final List<ClientProtos.Action> actions,
77+
boolean hasCondition) throws IOException, RpcThrottlingException;
78+
79+
/**
80+
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
81+
* available quota and to report the data/usage of the operation. This method does not support
82+
* scans because estimating a scan's workload is more complicated than estimating the workload of
83+
* a get/put.
84+
* @param region the region where the operation will be performed
85+
* @param numWrites number of writes to count against quota
86+
* @param numReads number of reads to count against quota
87+
* @return the OperationQuota
88+
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
89+
*/
90+
OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
91+
throws IOException, RpcThrottlingException;
92+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@
6565
import org.apache.hadoop.hbase.io.Reference;
6666
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
6767
import org.apache.hadoop.hbase.metrics.MetricRegistry;
68+
import org.apache.hadoop.hbase.quotas.OperationQuota;
69+
import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
70+
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
6871
import org.apache.hadoop.hbase.regionserver.Region.Operation;
6972
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
7073
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -83,6 +86,9 @@
8386
import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
8487
import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
8588

89+
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
90+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
91+
8692
/**
8793
* Implements the coprocessor environment and runtime support for coprocessors loaded within a
8894
* {@link Region}.
@@ -116,6 +122,7 @@ private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor
116122
ConcurrentMap<String, Object> sharedData;
117123
private final MetricRegistry metricRegistry;
118124
private final RegionServerServices services;
125+
private final RpcQuotaManager rpcQuotaManager;
119126

120127
/**
121128
* Constructor
@@ -131,6 +138,13 @@ public RegionEnvironment(final RegionCoprocessor impl, final int priority, final
131138
this.services = services;
132139
this.metricRegistry =
133140
MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
141+
// Some unit tests reach this line with services == null, and are okay with rpcQuotaManager
142+
// being null. Let these unit tests succeed. This should not happen in real usage.
143+
if (services != null) {
144+
this.rpcQuotaManager = services.getRegionServerRpcQuotaManager();
145+
} else {
146+
this.rpcQuotaManager = null;
147+
}
134148
}
135149

136150
/** Returns the region */
@@ -186,6 +200,35 @@ public RawCellBuilder getCellBuilder() {
186200
// We always do a DEEP_COPY only
187201
return RawCellBuilderFactory.create();
188202
}
203+
204+
@Override
205+
public RpcQuotaManager getRpcQuotaManager() {
206+
return rpcQuotaManager;
207+
}
208+
209+
@Override
210+
public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
211+
long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException {
212+
ClientProtos.ScanRequest scanRequest = RequestConverter
213+
.buildScanRequest(region.getRegionInfo().getRegionName(), scan, scan.getCaching(), false);
214+
long maxScannerResultSize =
215+
services.getConfiguration().getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
216+
HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
217+
return rpcQuotaManager.checkScanQuota(region, scanRequest, maxScannerResultSize,
218+
maxBlockBytesScanned, prevBlockBytesScannedDifference);
219+
}
220+
221+
@Override
222+
public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type)
223+
throws IOException, RpcThrottlingException {
224+
return rpcQuotaManager.checkBatchQuota(region, type);
225+
}
226+
227+
@Override
228+
public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
229+
throws IOException, RpcThrottlingException {
230+
return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads);
231+
}
189232
}
190233

191234
/**

0 commit comments

Comments
 (0)