|
35 | 35 | import java.util.Arrays;
|
36 | 36 | import java.util.Collection;
|
37 | 37 | import java.util.List;
|
| 38 | +import java.util.concurrent.atomic.AtomicInteger; |
38 | 39 | import org.apache.hadoop.conf.Configuration;
|
39 | 40 | import org.apache.hadoop.hbase.Cell;
|
40 | 41 | import org.apache.hadoop.hbase.CompareOperator;
|
|
44 | 45 | import org.apache.hadoop.hbase.HRegionLocation;
|
45 | 46 | import org.apache.hadoop.hbase.HTestConst;
|
46 | 47 | import org.apache.hadoop.hbase.KeyValue;
|
| 48 | +import org.apache.hadoop.hbase.ServerName; |
47 | 49 | import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
|
48 | 50 | import org.apache.hadoop.hbase.StartTestingClusterOption;
|
49 | 51 | import org.apache.hadoop.hbase.TableName;
|
50 | 52 | import org.apache.hadoop.hbase.TableNameTestRule;
|
51 | 53 | import org.apache.hadoop.hbase.TableNotFoundException;
|
52 | 54 | import org.apache.hadoop.hbase.client.Scan.ReadType;
|
53 | 55 | import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
| 56 | +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; |
54 | 57 | import org.apache.hadoop.hbase.filter.BinaryComparator;
|
55 | 58 | import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
|
56 | 59 | import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
|
57 | 60 | import org.apache.hadoop.hbase.filter.FilterBase;
|
58 | 61 | import org.apache.hadoop.hbase.filter.QualifierFilter;
|
| 62 | +import org.apache.hadoop.hbase.ipc.RpcClient; |
59 | 63 | import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
60 | 64 | import org.apache.hadoop.hbase.testclassification.ClientTests;
|
61 | 65 | import org.apache.hadoop.hbase.testclassification.MediumTests;
|
@@ -907,6 +911,86 @@ public void testScannerWithPartialResults() throws Exception {
|
907 | 911 | }
|
908 | 912 | }
|
909 | 913 |
|
| 914 | + @Test |
| 915 | + public void testRepeatedFinalScan() throws Exception { |
| 916 | + TableName tableName = TableName.valueOf("testRepeatedFinalScan"); |
| 917 | + TEST_UTIL.createTable(tableName, FAMILY).close(); |
| 918 | + |
| 919 | + Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); |
| 920 | + // We want to work on a separate connection. |
| 921 | + c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); |
| 922 | + c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot |
| 923 | + c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 5); |
| 924 | + |
| 925 | + Connection connection = ConnectionFactory.createConnection(c2); |
| 926 | + final Table table = connection.getTable(tableName); |
| 927 | + |
| 928 | + final int ROWS_TO_INSERT = 100; |
| 929 | + final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); |
| 930 | + |
| 931 | + Admin admin = TEST_UTIL.getAdmin(); |
| 932 | + List<Put> putList = new ArrayList<>(); |
| 933 | + for (long i = 0; i < ROWS_TO_INSERT; i++) { |
| 934 | + Put put = new Put(Bytes.toBytes(i)); |
| 935 | + put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE); |
| 936 | + putList.add(put); |
| 937 | + } |
| 938 | + table.put(putList); |
| 939 | + |
| 940 | + ServerName sn; |
| 941 | + try (RegionLocator rl = connection.getRegionLocator(tableName)) { |
| 942 | + sn = rl.getRegionLocation(Bytes.toBytes(1)).getServerName(); |
| 943 | + } |
| 944 | + RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient; |
| 945 | + |
| 946 | + // Avoid cancelling connection more than once per scan RPC. |
| 947 | + final AtomicInteger canCancelConnection = new AtomicInteger(0); |
| 948 | + |
| 949 | + Thread t = new Thread("testScanRepeatThread") { |
| 950 | + @Override |
| 951 | + public void run() { |
| 952 | + while (true) { |
| 953 | + try { |
| 954 | + Thread.sleep(10); |
| 955 | + if (canCancelConnection.get() == 1) { |
| 956 | + canCancelConnection.set(0); |
| 957 | + rpcClient.cancelConnections(sn); |
| 958 | + } |
| 959 | + } catch (InterruptedException t) { |
| 960 | + break; |
| 961 | + } |
| 962 | + } |
| 963 | + } |
| 964 | + }; |
| 965 | + t.start(); |
| 966 | + |
| 967 | + Scan scan = new Scan(); |
| 968 | + scan.addColumn(FAMILY, QUALIFIER); |
| 969 | + scan.setCaching(10); |
| 970 | + |
| 971 | + for (int run = 0; run < 5; run++) { |
| 972 | + try (ResultScanner scanner = table.getScanner(scan)) { |
| 973 | + for (int i = 0; i < ROWS_TO_INSERT; i++) { |
| 974 | + Result result; |
| 975 | + try { |
| 976 | + result = scanner.next(); |
| 977 | + } catch (RetriesExhaustedException ex) { |
| 978 | + // If most rows are ok then accept RetriesExhaustedException. This was |
| 979 | + // needed to make results consistent with and without fix. |
| 980 | + if (i > ROWS_TO_INSERT / 2) break; |
| 981 | + throw ex; |
| 982 | + } |
| 983 | + assertNotNull(result); |
| 984 | + if (i % 10 == 1) canCancelConnection.set(1); |
| 985 | + } |
| 986 | + } |
| 987 | + } |
| 988 | + t.interrupt(); |
| 989 | + |
| 990 | + table.close(); |
| 991 | + connection.close(); |
| 992 | + } |
| 993 | + |
910 | 994 | public static class LimitKVsReturnFilter extends FilterBase {
|
911 | 995 |
|
912 | 996 | private int cellCount = 0;
|
|
0 commit comments