Skip to content

Commit 7c99721

Browse files
author
Ray Mattingly
committed
Support quota user overrides
1 parent 94e4055 commit 7c99721

File tree

3 files changed

+241
-1
lines changed

3 files changed

+241
-1
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.EnumSet;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Optional;
2728
import java.util.Set;
2829
import java.util.concurrent.ConcurrentHashMap;
2930
import java.util.concurrent.ConcurrentMap;
@@ -35,8 +36,11 @@
3536
import org.apache.hadoop.hbase.TableName;
3637
import org.apache.hadoop.hbase.client.Get;
3738
import org.apache.hadoop.hbase.client.RegionStatesCount;
39+
import org.apache.hadoop.hbase.ipc.RpcCall;
40+
import org.apache.hadoop.hbase.ipc.RpcServer;
3841
import org.apache.hadoop.hbase.regionserver.HRegionServer;
3942
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
43+
import org.apache.hadoop.hbase.util.Bytes;
4044
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4145
import org.apache.hadoop.security.UserGroupInformation;
4246
import org.apache.yetus.audience.InterfaceAudience;
@@ -57,6 +61,11 @@ public class QuotaCache implements Stoppable {
5761
private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
5862

5963
public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
64+
65+
// defines the request attribute key which, when provided, will override the request's username
66+
// from the perspective of user quotas. This is also the default request attribute key
67+
public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
68+
"hbase.quota.user.override.key";
6069
private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
6170
private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
6271

@@ -74,12 +83,15 @@ public class QuotaCache implements Stoppable {
7483
private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
7584
new ConcurrentHashMap<>();
7685
private final RegionServerServices rsServices;
86+
private final String userOverrideRequestAttributeKey;
7787

7888
private QuotaRefresherChore refreshChore;
7989
private boolean stopped = true;
8090

8191
public QuotaCache(final RegionServerServices rsServices) {
8292
this.rsServices = rsServices;
93+
this.userOverrideRequestAttributeKey = rsServices.getConfiguration()
94+
.get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY, QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
8395
}
8496

8597
public void start() throws IOException {
@@ -125,7 +137,7 @@ public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableNa
125137
* @return the quota info associated to specified user
126138
*/
127139
public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
128-
return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new,
140+
return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), UserQuotaState::new,
129141
this::triggerCacheRefresh);
130142
}
131143

@@ -160,6 +172,23 @@ protected boolean isExceedThrottleQuotaEnabled() {
160172
return exceedThrottleQuotaEnabled;
161173
}
162174

175+
/**
176+
* Applies a request attribute user override if available, otherwise returns the UGI's short
177+
* username
178+
* @param ugi The request's UserGroupInformation
179+
*/
180+
private String getQuotaUserName(final UserGroupInformation ugi) {
181+
Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
182+
if (
183+
rpcCall.isPresent()
184+
&& rpcCall.get().getRequestAttributes().containsKey(userOverrideRequestAttributeKey)
185+
) {
186+
return Bytes
187+
.toString(rpcCall.get().getRequestAttributes().get(userOverrideRequestAttributeKey));
188+
}
189+
return ugi.getShortUserName();
190+
}
191+
163192
/**
164193
* Returns the QuotaState requested. If the quota info is not in cache an empty one will be
165194
* returned and the quota request will be enqueued for the next cache refresh.
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.quotas;
19+
20+
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import java.util.concurrent.TimeUnit;
25+
import org.apache.hadoop.hbase.HBaseTestingUtil;
26+
import org.apache.hadoop.hbase.HConstants;
27+
import org.apache.hadoop.hbase.TableName;
28+
import org.apache.hadoop.hbase.client.Admin;
29+
import org.apache.hadoop.hbase.client.Table;
30+
import org.apache.hadoop.hbase.security.User;
31+
import org.apache.hadoop.hbase.testclassification.MediumTests;
32+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
33+
import org.apache.hadoop.hbase.util.Bytes;
34+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
35+
import org.junit.AfterClass;
36+
import org.junit.BeforeClass;
37+
import org.junit.Test;
38+
import org.junit.experimental.categories.Category;
39+
40+
@Category({ RegionServerTests.class, MediumTests.class })
41+
public class TestQuotaUserOverride {
42+
43+
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
44+
private static final byte[] FAMILY = Bytes.toBytes("cf");
45+
private static final byte[] QUALIFIER = Bytes.toBytes("q");
46+
private static final int NUM_SERVERS = 1;
47+
48+
private static final TableName TABLE_NAME = TableName.valueOf("TestQuotaUserOverride");
49+
50+
@BeforeClass
51+
public static void setUpBeforeClass() throws Exception {
52+
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
53+
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1_000);
54+
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 1);
55+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
56+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
57+
TEST_UTIL.startMiniCluster(NUM_SERVERS);
58+
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
59+
QuotaCache.TEST_FORCE_REFRESH = true;
60+
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
61+
}
62+
63+
@AfterClass
64+
public static void tearDownAfterClass() throws Exception {
65+
EnvironmentEdgeManager.reset();
66+
TEST_UTIL.shutdownMiniCluster();
67+
}
68+
69+
@Test
70+
public void testUserGlobalThrottleWithOverride() throws Exception {
71+
final Admin admin = TEST_UTIL.getAdmin();
72+
final String userOverrideWithQuota = User.getCurrent().getShortName() + "123";
73+
final String userOverrideWithoutQuota = User.getCurrent().getShortName() + "456";
74+
75+
// Add 6req/min limit
76+
admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota,
77+
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
78+
Thread.sleep(60_000);
79+
80+
Table tableWithThrottle = admin.getConnection().getTableBuilder(TABLE_NAME, null)
81+
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
82+
Bytes.toBytes(userOverrideWithQuota))
83+
.build();
84+
Table tableWithoutThrottle = admin.getConnection().getTableBuilder(TABLE_NAME, null)
85+
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
86+
Bytes.toBytes(userOverrideWithoutQuota))
87+
.build();
88+
89+
// warm things up
90+
doPuts(10, FAMILY, QUALIFIER, tableWithThrottle);
91+
doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle);
92+
93+
// should reject some requests
94+
assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
95+
// should accept all puts
96+
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
97+
98+
// Remove all the limits
99+
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota));
100+
Thread.sleep(60_000);
101+
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
102+
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
103+
}
104+
105+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.quotas;
19+
20+
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import java.util.concurrent.TimeUnit;
25+
import org.apache.hadoop.hbase.HBaseTestingUtil;
26+
import org.apache.hadoop.hbase.HConstants;
27+
import org.apache.hadoop.hbase.TableName;
28+
import org.apache.hadoop.hbase.client.Admin;
29+
import org.apache.hadoop.hbase.client.Table;
30+
import org.apache.hadoop.hbase.security.User;
31+
import org.apache.hadoop.hbase.testclassification.MediumTests;
32+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
33+
import org.apache.hadoop.hbase.util.Bytes;
34+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
35+
import org.junit.AfterClass;
36+
import org.junit.BeforeClass;
37+
import org.junit.Test;
38+
import org.junit.experimental.categories.Category;
39+
40+
@Category({ RegionServerTests.class, MediumTests.class })
41+
public class TestQuotaUserOverrideConfiguration {
42+
43+
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
44+
private static final byte[] FAMILY = Bytes.toBytes("cf");
45+
private static final byte[] QUALIFIER = Bytes.toBytes("q");
46+
private static final int NUM_SERVERS = 1;
47+
private static final String CUSTOM_OVERRIDE_KEY = "foo";
48+
49+
private static final TableName TABLE_NAME =
50+
TableName.valueOf("TestQuotaUserOverrideConfiguration");
51+
52+
@BeforeClass
53+
public static void setUpBeforeClass() throws Exception {
54+
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
55+
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1_000);
56+
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 1);
57+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
58+
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
59+
TEST_UTIL.getConfiguration().set(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
60+
CUSTOM_OVERRIDE_KEY);
61+
TEST_UTIL.startMiniCluster(NUM_SERVERS);
62+
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
63+
QuotaCache.TEST_FORCE_REFRESH = true;
64+
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
65+
}
66+
67+
@AfterClass
68+
public static void tearDownAfterClass() throws Exception {
69+
EnvironmentEdgeManager.reset();
70+
TEST_UTIL.shutdownMiniCluster();
71+
}
72+
73+
@Test
74+
public void testUserGlobalThrottleWithCustomOverride() throws Exception {
75+
final Admin admin = TEST_UTIL.getAdmin();
76+
final String userOverrideWithQuota = User.getCurrent().getShortName() + "123";
77+
78+
// Add 6req/min limit
79+
admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota,
80+
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
81+
Thread.sleep(60_000);
82+
83+
Table tableWithThrottle = admin.getConnection().getTableBuilder(TABLE_NAME, null)
84+
.setRequestAttribute(CUSTOM_OVERRIDE_KEY, Bytes.toBytes(userOverrideWithQuota)).build();
85+
Table tableWithoutThrottle = admin.getConnection().getTableBuilder(TABLE_NAME, null)
86+
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
87+
Bytes.toBytes(userOverrideWithQuota))
88+
.build();
89+
90+
// warm things up
91+
doPuts(10, FAMILY, QUALIFIER, tableWithThrottle);
92+
doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle);
93+
94+
// should reject some requests
95+
assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
96+
// should accept all puts
97+
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
98+
99+
// Remove all the limits
100+
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota));
101+
Thread.sleep(60_000);
102+
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
103+
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
104+
}
105+
106+
}

0 commit comments

Comments
 (0)