Skip to content

Commit 83ea144

Browse files
rmdmattinglybbeaudreault
authored andcommitted
HBASE-27784: support quota user overrides (#5424)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
1 parent 4ebea72 commit 83ea144

File tree

6 files changed

+185
-2
lines changed

6 files changed

+185
-2
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,21 @@ public interface RpcCall extends RpcCallContext {
9292
Map<String, byte[]> getConnectionAttributes();
9393

9494
/**
95-
* Returns the map of attributes specified when building the request.
95+
* Returns the map of attributes specified when building the request. This map is lazily evaluated
96+
* so if you only need a single attribute then it may be cheaper to use
97+
* {@link #getRequestAttribute(String)}
9698
* @see org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[])
9799
*/
98100
Map<String, byte[]> getRequestAttributes();
99101

102+
/**
103+
* Returns a single request attribute value, or null if no value is present. If you need many
104+
* request attributes then you should fetch the lazily evaluated map via
105+
* {@link #getRequestAttributes()}
106+
* @see org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[])
107+
*/
108+
byte[] getRequestAttribute(String key);
109+
100110
/** Returns Port of remote address in this call */
101111
int getRemotePort();
102112

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,19 @@ public Map<String, byte[]> getRequestAttributes() {
234234
return this.requestAttributes;
235235
}
236236

237+
@Override
238+
public byte[] getRequestAttribute(String key) {
239+
if (this.requestAttributes == null) {
240+
for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) {
241+
if (nameBytesPair.getName().equals(key)) {
242+
return nameBytesPair.getValue().toByteArray();
243+
}
244+
}
245+
return null;
246+
}
247+
return this.requestAttributes.get(key);
248+
}
249+
237250
@Override
238251
public int getPriority() {
239252
return this.header.getPriority();

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.EnumSet;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Optional;
2728
import java.util.Set;
2829
import java.util.concurrent.ConcurrentHashMap;
2930
import java.util.concurrent.ConcurrentMap;
@@ -35,8 +36,11 @@
3536
import org.apache.hadoop.hbase.TableName;
3637
import org.apache.hadoop.hbase.client.Get;
3738
import org.apache.hadoop.hbase.client.RegionStatesCount;
39+
import org.apache.hadoop.hbase.ipc.RpcCall;
40+
import org.apache.hadoop.hbase.ipc.RpcServer;
3841
import org.apache.hadoop.hbase.regionserver.HRegionServer;
3942
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
43+
import org.apache.hadoop.hbase.util.Bytes;
4044
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4145
import org.apache.hadoop.security.UserGroupInformation;
4246
import org.apache.yetus.audience.InterfaceAudience;
@@ -57,6 +61,11 @@ public class QuotaCache implements Stoppable {
5761
private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
5862

5963
public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
64+
65+
// defines the request attribute key which, when provided, will override the request's username
66+
// from the perspective of user quotas
67+
public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
68+
"hbase.quota.user.override.key";
6069
private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
6170
private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
6271

@@ -74,12 +83,15 @@ public class QuotaCache implements Stoppable {
7483
private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
7584
new ConcurrentHashMap<>();
7685
private final RegionServerServices rsServices;
86+
private final String userOverrideRequestAttributeKey;
7787

7888
private QuotaRefresherChore refreshChore;
7989
private boolean stopped = true;
8090

8191
public QuotaCache(final RegionServerServices rsServices) {
8292
this.rsServices = rsServices;
93+
this.userOverrideRequestAttributeKey =
94+
rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
8395
}
8496

8597
public void start() throws IOException {
@@ -125,7 +137,7 @@ public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableNa
125137
* @return the quota info associated to specified user
126138
*/
127139
public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
128-
return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new,
140+
return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), UserQuotaState::new,
129141
this::triggerCacheRefresh);
130142
}
131143

@@ -160,6 +172,28 @@ protected boolean isExceedThrottleQuotaEnabled() {
160172
return exceedThrottleQuotaEnabled;
161173
}
162174

175+
/**
176+
* Applies a request attribute user override if available, otherwise returns the UGI's short
177+
* username
178+
* @param ugi The request's UserGroupInformation
179+
*/
180+
private String getQuotaUserName(final UserGroupInformation ugi) {
181+
if (userOverrideRequestAttributeKey == null) {
182+
return ugi.getShortUserName();
183+
}
184+
185+
Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
186+
if (!rpcCall.isPresent()) {
187+
return ugi.getShortUserName();
188+
}
189+
190+
byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey);
191+
if (override == null) {
192+
return ugi.getShortUserName();
193+
}
194+
return Bytes.toString(override);
195+
}
196+
163197
/**
164198
* Returns the QuotaState requested. If the quota info is not in cache an empty one will be
165199
* returned and the quota request will be enqueued for the next cache refresh.

hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,11 @@ public Map<String, byte[]> getRequestAttributes() {
771771
pair -> pair.getValue().toByteArray()));
772772
}
773773

774+
@Override
775+
public byte[] getRequestAttribute(String key) {
776+
return null;
777+
}
778+
774779
@Override
775780
public int getRemotePort() {
776781
return 0;

hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,11 @@ public Map<String, byte[]> getRequestAttributes() {
232232
return null;
233233
}
234234

235+
@Override
236+
public byte[] getRequestAttribute(String key) {
237+
return null;
238+
}
239+
235240
@Override
236241
public int getRemotePort() {
237242
return 0;
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.quotas;
19+
20+
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import java.util.concurrent.TimeUnit;
25+
import org.apache.hadoop.hbase.HBaseClassTestRule;
26+
import org.apache.hadoop.hbase.HBaseTestingUtil;
27+
import org.apache.hadoop.hbase.HConstants;
28+
import org.apache.hadoop.hbase.TableName;
29+
import org.apache.hadoop.hbase.client.Admin;
30+
import org.apache.hadoop.hbase.client.Table;
31+
import org.apache.hadoop.hbase.security.User;
32+
import org.apache.hadoop.hbase.testclassification.MediumTests;
33+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
34+
import org.apache.hadoop.hbase.util.Bytes;
35+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
36+
import org.junit.AfterClass;
37+
import org.junit.BeforeClass;
38+
import org.junit.ClassRule;
39+
import org.junit.Test;
40+
import org.junit.experimental.categories.Category;
41+
42+
@Category({ RegionServerTests.class, MediumTests.class })
43+
public class TestQuotaUserOverride {
44+
45+
@ClassRule
46+
public static final HBaseClassTestRule CLASS_RULE =
47+
HBaseClassTestRule.forClass(TestQuotaUserOverride.class);
48+
49+
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
50+
private static final byte[] FAMILY = Bytes.toBytes("cf");
51+
private static final byte[] QUALIFIER = Bytes.toBytes("q");
52+
private static final int NUM_SERVERS = 1;
53+
private static final String CUSTOM_OVERRIDE_KEY = "foo";
54+
55+
private static final TableName TABLE_NAME = TableName.valueOf("TestQuotaUserOverride");
56+
57+
@BeforeClass
58+
public static void setUpBeforeClass() throws Exception {
59+
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
60+
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1_000);
61+
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 1);
62+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
63+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
64+
TEST_UTIL.getConfiguration().set(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
65+
CUSTOM_OVERRIDE_KEY);
66+
TEST_UTIL.startMiniCluster(NUM_SERVERS);
67+
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
68+
QuotaCache.TEST_FORCE_REFRESH = true;
69+
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
70+
}
71+
72+
@AfterClass
73+
public static void tearDownAfterClass() throws Exception {
74+
EnvironmentEdgeManager.reset();
75+
TEST_UTIL.shutdownMiniCluster();
76+
}
77+
78+
@Test
79+
public void testUserGlobalThrottleWithCustomOverride() throws Exception {
80+
final Admin admin = TEST_UTIL.getAdmin();
81+
final String userOverrideWithQuota = User.getCurrent().getShortName() + "123";
82+
83+
// Add 6req/min limit
84+
admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota,
85+
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
86+
87+
Table tableWithThrottle = TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
88+
.setRequestAttribute(CUSTOM_OVERRIDE_KEY, Bytes.toBytes(userOverrideWithQuota)).build();
89+
Table tableWithoutThrottle = TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
90+
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
91+
Bytes.toBytes(userOverrideWithQuota))
92+
.build();
93+
Table tableWithoutThrottle2 =
94+
TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).build();
95+
96+
// warm things up
97+
doPuts(10, FAMILY, QUALIFIER, tableWithThrottle);
98+
doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle);
99+
doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2);
100+
101+
// should reject some requests
102+
assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
103+
// should accept all puts
104+
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
105+
// should accept all puts
106+
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));
107+
108+
// Remove all the limits
109+
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota));
110+
Thread.sleep(60_000);
111+
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
112+
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
113+
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));
114+
}
115+
116+
}

0 commit comments

Comments
 (0)