Skip to content

Commit aa6f13e

Browse files
committed
HBASE-26784 Addendum: Close scanner request should properly inherit original timeout and priority
1 parent 5bae04e commit aa6f13e

File tree

3 files changed

+51
-13
lines changed

3 files changed

+51
-13
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -312,13 +312,19 @@ private void close() {
312312
ScanRequest request =
313313
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
314314
HBaseRpcController controller = rpcControllerFactory.newController();
315-
// pull in the original priority, but then try to set to HIGH.
316-
// it will take whatever is highest.
317-
controller.setPriority(controller.getPriority());
318-
controller.setPriority(HConstants.HIGH_QOS);
319-
if (controller.hasCallTimeout()) {
320-
controller.setCallTimeout(controller.getCallTimeout());
315+
316+
// Set fields from the original controller onto the close-specific controller
317+
// We set the timeout and the priority -- we will overwrite the priority to HIGH
318+
// below, but the controller will take whichever is highest.
319+
if (getRpcController() instanceof HBaseRpcController) {
320+
HBaseRpcController original = (HBaseRpcController) getRpcController();
321+
controller.setPriority(original.getPriority());
322+
if (original.hasCallTimeout()) {
323+
controller.setCallTimeout(original.getCallTimeout());
324+
}
321325
}
326+
controller.setPriority(HConstants.HIGH_QOS);
327+
322328
try {
323329
getStub().scan(controller, request);
324330
} catch (Exception e) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,4 +262,11 @@ public synchronized void notifyOnCancel(RpcCallback<Object> callback, Cancellati
262262
action.run(false);
263263
}
264264
}
265+
266+
@Override public String toString() {
267+
return "HBaseRpcControllerImpl{" + "callTimeout=" + callTimeout + ", done=" + done
268+
+ ", cancelled=" + cancelled + ", cancellationCbs=" + cancellationCbs + ", exception="
269+
+ exception + ", regionInfo=" + regionInfo + ", priority=" + priority + ", cellScanner="
270+
+ cellScanner + '}';
271+
}
265272
}

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,12 @@
3232
import static org.mockito.Mockito.mock;
3333
import static org.mockito.Mockito.times;
3434
import static org.mockito.Mockito.verify;
35-
3635
import java.io.IOException;
3736
import java.util.Arrays;
3837
import java.util.Optional;
3938
import java.util.concurrent.ExecutorService;
4039
import java.util.concurrent.Executors;
4140
import java.util.concurrent.atomic.AtomicInteger;
42-
4341
import org.apache.hadoop.conf.Configuration;
4442
import org.apache.hadoop.hbase.Cell;
4543
import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -74,6 +72,7 @@
7472
*/
7573
@Category({ ClientTests.class, MediumTests.class })
7674
public class TestTableRpcPriority {
75+
7776
@ClassRule
7877
public static final HBaseClassTestRule CLASS_RULE =
7978
HBaseClassTestRule.forClass(TestTableRpcPriority.class);
@@ -89,6 +88,7 @@ public void setUp() throws IOException, ServiceException {
8988
stub = mock(ClientProtos.ClientService.BlockingInterface.class);
9089

9190
Configuration conf = HBaseConfiguration.create();
91+
9292
ExecutorService executorService = Executors.newCachedThreadPool();
9393
conn = new ConnectionImplementation(conf, executorService,
9494
UserProvider.instantiate(conf).getCurrent(), new DoNothingConnectionRegistry(conf)) {
@@ -122,6 +122,16 @@ public void testScan() throws Exception {
122122
testForTable(TableName.valueOf(name.getMethodName()), Optional.of(19));
123123
}
124124

125+
/**
126+
* This test verifies that our closeScanner request honors the original
127+
* priority of the scan if it's greater than our expected HIGH_QOS for close calls.
128+
*/
129+
@Test
130+
public void testScanSuperHighPriority() throws Exception {
131+
mockScan(1000);
132+
testForTable(TableName.valueOf(name.getMethodName()), Optional.of(1000));
133+
}
134+
125135
@Test
126136
public void testScanNormalTable() throws Exception {
127137
mockScan(NORMAL_QOS);
@@ -153,11 +163,22 @@ private void testForTable(TableName tableName, Optional<Integer> priority) throw
153163
// just verify that the calls happened. verification of priority occurred in the mocking
154164
// open, next, then several renew lease
155165
verify(stub, atLeast(3)).scan(any(), any(ClientProtos.ScanRequest.class));
156-
verify(stub, times(1)).scan(any(), assertScannerCloseRequest());
166+
verify(stub, times(1)).scan(
167+
assertControllerArgs(Math.max(priority.orElse(0), HIGH_QOS)), assertScannerCloseRequest());
157168
}
158169

159170
private void mockScan(int scanPriority) throws ServiceException {
160171
int scannerId = 1;
172+
173+
doAnswer(new Answer<ClientProtos.ScanResponse>() {
174+
@Override public ClientProtos.ScanResponse answer(InvocationOnMock invocation)
175+
throws Throwable {
176+
throw new IllegalArgumentException(
177+
"Call not covered by explicit mock for arguments controller="
178+
+ invocation.getArgument(0) + ", request=" + invocation.getArgument(1));
179+
}
180+
}).when(stub).scan(any(), any());
181+
161182
AtomicInteger scanNextCalled = new AtomicInteger(0);
162183
doAnswer(new Answer<ClientProtos.ScanResponse>() {
163184

@@ -182,7 +203,7 @@ public ClientProtos.ScanResponse answer(InvocationOnMock invocation)
182203
return builder.setTtl(800).setMoreResultsInRegion(true).setMoreResults(true)
183204
.addResults(ProtobufUtil.toResult(result)).build();
184205
}
185-
}).when(stub).scan(assertPriority(scanPriority), any(ClientProtos.ScanRequest.class));
206+
}).when(stub).scan(assertControllerArgs(scanPriority), any());
186207

187208
doAnswer(new Answer<ClientProtos.ScanResponse>() {
188209

@@ -197,15 +218,19 @@ public ClientProtos.ScanResponse answer(InvocationOnMock invocation)
197218

198219
return ClientProtos.ScanResponse.getDefaultInstance();
199220
}
200-
}).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest());
221+
}).when(stub).scan(assertControllerArgs(Math.max(scanPriority, HIGH_QOS)),
222+
assertScannerCloseRequest());
201223
}
202224

203-
private HBaseRpcController assertPriority(int priority) {
225+
private HBaseRpcController assertControllerArgs(int priority) {
204226
return argThat(new ArgumentMatcher<HBaseRpcController>() {
205227

206228
@Override
207229
public boolean matches(HBaseRpcController controller) {
208-
return controller.getPriority() == priority;
230+
// check specified priority, but also check that it has a timeout
231+
// this ensures that our conversion from the base controller to the close-specific
232+
// controller honored the original arguments.
233+
return controller.getPriority() == priority && controller.hasCallTimeout();
209234
}
210235
});
211236
}

0 commit comments

Comments
 (0)