Skip to content

Commit e1ca764

Browse files
rmdmattinglyeab148Evie Boland
committed
HubSpot Backport: HBASE-28842 TestRequestAttributes should fail when expected (apache#6255) (apache#6287)
Signed-off-by: Ray Mattingly <rmattingly@apache.org> Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Co-authored-by: eab148 <54775485+eab148@users.noreply.github.com> Co-authored-by: Evie Boland <eboland@hubspot.com>
1 parent 2d08fa6 commit e1ca764

File tree

1 file changed

+372
-0
lines changed

1 file changed

+372
-0
lines changed
Lines changed: 372 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,372 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Optional;
27+
import java.util.UUID;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import org.apache.hadoop.conf.Configuration;
31+
import org.apache.hadoop.hbase.AuthUtil;
32+
import org.apache.hadoop.hbase.Cell;
33+
import org.apache.hadoop.hbase.CellScannable;
34+
import org.apache.hadoop.hbase.CellScanner;
35+
import org.apache.hadoop.hbase.HBaseClassTestRule;
36+
import org.apache.hadoop.hbase.HBaseTestingUtility;
37+
import org.apache.hadoop.hbase.HConstants;
38+
import org.apache.hadoop.hbase.MiniHBaseCluster;
39+
import org.apache.hadoop.hbase.TableName;
40+
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
41+
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
42+
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
43+
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
44+
import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
45+
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
46+
import org.apache.hadoop.hbase.ipc.RpcCall;
47+
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
48+
import org.apache.hadoop.hbase.ipc.RpcServer;
49+
import org.apache.hadoop.hbase.regionserver.InternalScanner;
50+
import org.apache.hadoop.hbase.testclassification.ClientTests;
51+
import org.apache.hadoop.hbase.testclassification.MediumTests;
52+
import org.apache.hadoop.hbase.util.Bytes;
53+
import org.apache.hadoop.hbase.wal.WALEdit;
54+
import org.junit.AfterClass;
55+
import org.junit.BeforeClass;
56+
import org.junit.ClassRule;
57+
import org.junit.Test;
58+
import org.junit.experimental.categories.Category;
59+
60+
@Category({ ClientTests.class, MediumTests.class })
61+
public class TestRequestAttributes {
62+
63+
@ClassRule
64+
public static final HBaseClassTestRule CLASS_RULE =
65+
HBaseClassTestRule.forClass(TestRequestAttributes.class);
66+
67+
private static final byte[] ROW_KEY1 = Bytes.toBytes("1");
68+
private static final byte[] ROW_KEY2A = Bytes.toBytes("2A");
69+
private static final byte[] ROW_KEY2B = Bytes.toBytes("2B");
70+
private static final byte[] ROW_KEY3 = Bytes.toBytes("3");
71+
private static final byte[] ROW_KEY4 = Bytes.toBytes("4");
72+
private static final byte[] ROW_KEY5 = Bytes.toBytes("5");
73+
private static final byte[] ROW_KEY6 = Bytes.toBytes("6");
74+
private static final byte[] ROW_KEY7 = Bytes.toBytes("7");
75+
private static final byte[] ROW_KEY8 = Bytes.toBytes("8");
76+
private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new HashMap<>();
77+
private static final Map<String, byte[]> REQUEST_ATTRIBUTES_SCAN = addRandomRequestAttributes();
78+
private static final Map<byte[], Map<String, byte[]>> ROW_KEY_TO_REQUEST_ATTRIBUTES =
79+
new HashMap<>();
80+
static {
81+
CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo"));
82+
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY1, addRandomRequestAttributes());
83+
Map<String, byte[]> requestAttributes2 = addRandomRequestAttributes();
84+
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2A, requestAttributes2);
85+
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2B, requestAttributes2);
86+
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY3, addRandomRequestAttributes());
87+
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY4, addRandomRequestAttributes());
88+
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY5, addRandomRequestAttributes());
89+
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY6, addRandomRequestAttributes());
90+
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY7, addRandomRequestAttributes());
91+
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY8, new HashMap<String, byte[]>());
92+
}
93+
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100);
94+
private static final byte[] FAMILY = Bytes.toBytes("0");
95+
private static final TableName TABLE_NAME = TableName.valueOf("testRequestAttributes");
96+
97+
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
98+
private static MiniHBaseCluster cluster;
99+
100+
@BeforeClass
101+
public static void setUp() throws Exception {
102+
cluster = TEST_UTIL.startMiniCluster(1);
103+
Table table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1,
104+
HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName());
105+
table.close();
106+
}
107+
108+
@AfterClass
109+
public static void afterClass() throws Exception {
110+
cluster.close();
111+
TEST_UTIL.shutdownMiniCluster();
112+
}
113+
114+
@Test
115+
public void testRequestAttributesGet() throws IOException {
116+
Configuration conf = TEST_UTIL.getConfiguration();
117+
try (
118+
Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
119+
CONNECTION_ATTRIBUTES);
120+
Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
121+
ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY1)).build()) {
122+
123+
table.get(new Get(ROW_KEY1));
124+
}
125+
}
126+
127+
@Test
128+
public void testRequestAttributesMultiGet() throws IOException {
129+
Configuration conf = TEST_UTIL.getConfiguration();
130+
try (
131+
Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
132+
CONNECTION_ATTRIBUTES);
133+
Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
134+
ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY2A)).build()) {
135+
List<Get> gets = new ArrayList<>(2);
136+
gets.add(new Get(ROW_KEY2A));
137+
gets.add(new Get(ROW_KEY2B));
138+
table.get(gets);
139+
}
140+
}
141+
142+
@Test
143+
public void testRequestAttributesScan() throws IOException {
144+
Configuration conf = TEST_UTIL.getConfiguration();
145+
try (
146+
Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
147+
CONNECTION_ATTRIBUTES);
148+
Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
149+
REQUEST_ATTRIBUTES_SCAN).build()) {
150+
ResultScanner scanner = table.getScanner(new Scan());
151+
scanner.next();
152+
}
153+
}
154+
155+
@Test
156+
public void testRequestAttributesPut() throws IOException {
157+
Configuration conf = TEST_UTIL.getConfiguration();
158+
try (
159+
Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
160+
CONNECTION_ATTRIBUTES);
161+
Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
162+
ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY3)).build()) {
163+
Put put = new Put(ROW_KEY3);
164+
put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v"));
165+
table.put(put);
166+
}
167+
}
168+
169+
@Test
170+
public void testRequestAttributesMultiPut() throws IOException {
171+
Configuration conf = TEST_UTIL.getConfiguration();
172+
try (
173+
Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
174+
CONNECTION_ATTRIBUTES);
175+
Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
176+
ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY4)).build()) {
177+
Put put1 = new Put(ROW_KEY4);
178+
put1.addColumn(FAMILY, Bytes.toBytes("c1"), Bytes.toBytes("v1"));
179+
Put put2 = new Put(ROW_KEY4);
180+
put2.addColumn(FAMILY, Bytes.toBytes("c2"), Bytes.toBytes("v2"));
181+
List<Put> puts = new ArrayList<>(2);
182+
puts.add(put1);
183+
puts.add(put1);
184+
table.put(puts);
185+
}
186+
}
187+
188+
@Test
189+
public void testRequestAttributesBufferedMutate() throws IOException, InterruptedException {
190+
Configuration conf = TEST_UTIL.getConfiguration();
191+
try (
192+
Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
193+
CONNECTION_ATTRIBUTES);
194+
BufferedMutator bufferedMutator =
195+
conn.getBufferedMutator(configureRequestAttributes(new BufferedMutatorParams(TABLE_NAME),
196+
ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY5)));) {
197+
Put put = new Put(ROW_KEY5);
198+
put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v"));
199+
bufferedMutator.mutate(put);
200+
bufferedMutator.flush();
201+
}
202+
}
203+
204+
@Test
205+
public void testRequestAttributesExists() throws IOException {
206+
Configuration conf = TEST_UTIL.getConfiguration();
207+
try (
208+
Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
209+
CONNECTION_ATTRIBUTES);
210+
Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE),
211+
ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY6)).build()) {
212+
213+
table.exists(new Get(ROW_KEY6));
214+
}
215+
}
216+
217+
@Test
218+
public void testRequestAttributesFromRpcController() throws IOException, InterruptedException {
219+
Configuration conf = TEST_UTIL.getConfiguration();
220+
conf.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
221+
RequestMetadataControllerFactory.class, RpcControllerFactory.class);
222+
try (
223+
Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf),
224+
CONNECTION_ATTRIBUTES);
225+
BufferedMutator bufferedMutator = conn.getBufferedMutator(TABLE_NAME);) {
226+
Put put = new Put(ROW_KEY7);
227+
put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v"));
228+
bufferedMutator.mutate(put);
229+
bufferedMutator.flush();
230+
}
231+
conf.unset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY);
232+
}
233+
234+
@Test
235+
public void testNoRequestAttributes() throws IOException {
236+
Configuration conf = TEST_UTIL.getConfiguration();
237+
try (Connection conn = ConnectionFactory.createConnection(conf, null,
238+
AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) {
239+
TableBuilder tableBuilder = conn.getTableBuilder(TABLE_NAME, null);
240+
try (Table table = tableBuilder.build()) {
241+
table.get(new Get(ROW_KEY8));
242+
}
243+
}
244+
}
245+
246+
private static Map<String, byte[]> addRandomRequestAttributes() {
247+
Map<String, byte[]> requestAttributes = new HashMap<>();
248+
int j = Math.max(2, (int) (10 * Math.random()));
249+
for (int i = 0; i < j; i++) {
250+
requestAttributes.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString()));
251+
}
252+
return requestAttributes;
253+
}
254+
255+
private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder,
256+
Map<String, byte[]> requestAttributes) {
257+
requestAttributes.forEach(tableBuilder::setRequestAttribute);
258+
return tableBuilder;
259+
}
260+
261+
private static BufferedMutatorParams configureRequestAttributes(BufferedMutatorParams params,
262+
Map<String, byte[]> requestAttributes) {
263+
requestAttributes.forEach(params::setRequestAttribute);
264+
return params;
265+
}
266+
267+
public static class RequestMetadataControllerFactory extends RpcControllerFactory {
268+
269+
public RequestMetadataControllerFactory(Configuration conf) {
270+
super(conf);
271+
}
272+
273+
@Override
274+
public HBaseRpcController newController() {
275+
return new RequestMetadataController(super.newController());
276+
}
277+
278+
@Override
279+
public HBaseRpcController newController(CellScanner cellScanner) {
280+
return new RequestMetadataController(super.newController(null, cellScanner));
281+
}
282+
283+
@Override
284+
public HBaseRpcController newController(RegionInfo regionInfo, CellScanner cellScanner) {
285+
return new RequestMetadataController(super.newController(regionInfo, cellScanner));
286+
}
287+
288+
@Override
289+
public HBaseRpcController newController(final List<CellScannable> cellIterables) {
290+
return new RequestMetadataController(super.newController(null, cellIterables));
291+
}
292+
293+
@Override
294+
public HBaseRpcController newController(RegionInfo regionInfo,
295+
final List<CellScannable> cellIterables) {
296+
return new RequestMetadataController(super.newController(regionInfo, cellIterables));
297+
}
298+
299+
public static class RequestMetadataController extends DelegatingHBaseRpcController {
300+
private final Map<String, byte[]> requestAttributes;
301+
302+
RequestMetadataController(HBaseRpcController delegate) {
303+
super(delegate);
304+
this.requestAttributes = ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY7);
305+
}
306+
307+
@Override
308+
public Map<String, byte[]> getRequestAttributes() {
309+
return requestAttributes;
310+
}
311+
}
312+
}
313+
314+
public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor {
315+
316+
@Override
317+
public Optional<RegionObserver> getRegionObserver() {
318+
return Optional.of(this);
319+
}
320+
321+
@Override
322+
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
323+
List<Cell> result) throws IOException {
324+
if (!isValidRequestAttributes(getRequestAttributesForRowKey(get.getRow()))) {
325+
throw new IOException("Incorrect request attributes");
326+
}
327+
}
328+
329+
@Override
330+
public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
331+
InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
332+
if (!isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN)) {
333+
throw new IOException("Incorrect request attributes");
334+
}
335+
return hasNext;
336+
}
337+
338+
@Override
339+
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit)
340+
throws IOException {
341+
if (!isValidRequestAttributes(getRequestAttributesForRowKey(put.getRow()))) {
342+
throw new IOException("Incorrect request attributes");
343+
}
344+
}
345+
346+
private Map<String, byte[]> getRequestAttributesForRowKey(byte[] rowKey) {
347+
for (byte[] byteArray : ROW_KEY_TO_REQUEST_ATTRIBUTES.keySet()) {
348+
if (Arrays.equals(byteArray, rowKey)) {
349+
return ROW_KEY_TO_REQUEST_ATTRIBUTES.get(byteArray);
350+
}
351+
}
352+
return null;
353+
}
354+
355+
private boolean isValidRequestAttributes(Map<String, byte[]> requestAttributes) {
356+
RpcCall rpcCall = RpcServer.getCurrentCall().get();
357+
Map<String, byte[]> attrs = rpcCall.getRequestAttributes();
358+
if (attrs.size() != requestAttributes.size()) {
359+
return false;
360+
}
361+
for (Map.Entry<String, byte[]> attr : attrs.entrySet()) {
362+
if (!requestAttributes.containsKey(attr.getKey())) {
363+
return false;
364+
}
365+
if (!Arrays.equals(requestAttributes.get(attr.getKey()), attr.getValue())) {
366+
return false;
367+
}
368+
}
369+
return true;
370+
}
371+
}
372+
}

0 commit comments

Comments
 (0)