Skip to content

Commit 9849d3c

Browse files
bbeaudreaultapurtell
authored andcommitted
HBASE-26783 ScannerCallable doubly clears meta cache on retries (#4147)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
1 parent 2105170 commit 9849d3c

File tree

6 files changed

+308
-59
lines changed

6 files changed

+308
-59
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424

2525
import java.io.IOException;
2626
import java.io.InterruptedIOException;
27-
import java.util.ArrayList;
28-
import java.util.List;
2927

3028
import org.apache.hadoop.hbase.DoNotRetryIOException;
3129
import org.apache.hadoop.hbase.HConstants;
3230
import org.apache.hadoop.hbase.HRegionLocation;
3331
import org.apache.hadoop.hbase.RegionLocations;
3432
import org.apache.hadoop.hbase.TableName;
33+
import org.apache.hadoop.hbase.TableNotEnabledException;
34+
import org.apache.hadoop.hbase.util.Pair;
3535
import org.apache.yetus.audience.InterfaceAudience;
3636
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
3737
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -44,6 +44,8 @@
4444
@InterfaceAudience.Private
4545
public class ReversedScannerCallable extends ScannerCallable {
4646

47+
private byte[] locationSearchKey;
48+
4749
/**
4850
* @param connection which connection
4951
* @param tableName table callable is on
@@ -59,6 +61,18 @@ public ReversedScannerCallable(ClusterConnection connection, TableName tableName
5961
super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
6062
}
6163

64+
@Override
65+
public void throwable(Throwable t, boolean retrying) {
66+
// for reverse scans, we need to update cache using the search key found for the reverse scan
67+
// range in prepare. Otherwise, we will see weird behavior at the table boundaries,
68+
// when trying to clear cache for an empty row.
69+
if (location != null && locationSearchKey != null) {
70+
getConnection().updateCachedLocations(getTableName(),
71+
location.getRegionInfo().getRegionName(),
72+
locationSearchKey, t, location.getServerName());
73+
}
74+
}
75+
6276
/**
6377
* @param reload force reload of server location
6478
*/
@@ -67,33 +81,37 @@ public void prepare(boolean reload) throws IOException {
6781
if (Thread.interrupted()) {
6882
throw new InterruptedIOException();
6983
}
84+
85+
if (reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
86+
&& getConnection().isTableDisabled(getTableName())) {
87+
throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
88+
}
89+
7090
if (!instantiated || reload) {
7191
// we should use range locate if
7292
// 1. we do not want the start row
7393
// 2. the start row is empty which means we need to locate to the last region.
7494
if (scan.includeStartRow() && !isEmptyStartRow(getRow())) {
7595
// Just locate the region with the row
76-
RegionLocations rl = getRegionLocations(reload, getRow());
96+
RegionLocations rl = getRegionLocationsForPrepare(getRow());
7797
this.location = getLocationForReplica(rl);
78-
if (location == null || location.getServerName() == null) {
79-
throw new IOException("Failed to find location, tableName="
80-
+ getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload="
81-
+ reload);
82-
}
98+
this.locationSearchKey = getRow();
8399
} else {
84-
// Need to locate the regions with the range, and the target location is
85-
// the last one which is the previous region of last region scanner
100+
// The locateStart row is an approximation. So we need to search between
101+
// that and the actual row in order to really find the last region
86102
byte[] locateStartRow = createCloseRowBefore(getRow());
87-
List<HRegionLocation> locatedRegions = locateRegionsInRange(
88-
locateStartRow, getRow(), reload);
89-
if (locatedRegions.isEmpty()) {
90-
throw new DoNotRetryIOException(
91-
"Does hbase:meta exist hole? Couldn't get regions for the range from "
92-
+ Bytes.toStringBinary(locateStartRow) + " to "
93-
+ Bytes.toStringBinary(getRow()));
94-
}
95-
this.location = locatedRegions.get(locatedRegions.size() - 1);
103+
Pair<HRegionLocation, byte[]> lastRegionAndKey = locateLastRegionInRange(
104+
locateStartRow, getRow());
105+
this.location = lastRegionAndKey.getFirst();
106+
this.locationSearchKey = lastRegionAndKey.getSecond();
96107
}
108+
109+
if (location == null || location.getServerName() == null) {
110+
throw new IOException("Failed to find location, tableName="
111+
+ getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload="
112+
+ reload);
113+
}
114+
97115
setStub(getConnection().getClient(getLocation().getServerName()));
98116
checkIfRegionServerIsRemote();
99117
instantiated = true;
@@ -106,29 +124,32 @@ public void prepare(boolean reload) throws IOException {
106124
}
107125

108126
/**
109-
* Get the corresponding regions for an arbitrary range of keys.
127+
* Get the last region before the endkey, which will be used to execute the reverse scan
110128
* @param startKey Starting row in range, inclusive
111129
* @param endKey Ending row in range, exclusive
112-
* @param reload force reload of server location
113-
* @return A list of HRegionLocation corresponding to the regions that contain
114-
* the specified range
130+
* @return The last location, and the rowKey used to find it. May be null,
131+
* if a region could not be found.
115132
*/
116-
private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
117-
byte[] endKey, boolean reload) throws IOException {
133+
private Pair<HRegionLocation, byte[]> locateLastRegionInRange(byte[] startKey, byte[] endKey)
134+
throws IOException {
118135
final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
119136
HConstants.EMPTY_END_ROW);
120137
if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
121138
throw new IllegalArgumentException("Invalid range: "
122139
+ Bytes.toStringBinary(startKey) + " > "
123140
+ Bytes.toStringBinary(endKey));
124141
}
125-
List<HRegionLocation> regionList = new ArrayList<>();
142+
143+
HRegionLocation lastRegion = null;
144+
byte[] lastFoundKey = null;
126145
byte[] currentKey = startKey;
146+
127147
do {
128-
RegionLocations rl = getRegionLocations(reload, currentKey);
148+
RegionLocations rl = getRegionLocationsForPrepare(currentKey);
129149
HRegionLocation regionLocation = getLocationForReplica(rl);
130150
if (regionLocation.getRegionInfo().containsRow(currentKey)) {
131-
regionList.add(regionLocation);
151+
lastFoundKey = currentKey;
152+
lastRegion = regionLocation;
132153
} else {
133154
throw new DoNotRetryIOException(
134155
"Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey) +
@@ -137,7 +158,8 @@ private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
137158
currentKey = regionLocation.getRegionInfo().getEndKey();
138159
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
139160
&& (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
140-
return regionList;
161+
162+
return new Pair<>(lastRegion, lastFoundKey);
141163
}
142164

143165
@Override

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.hadoop.hbase.RegionLocations;
3636
import org.apache.hadoop.hbase.ServerName;
3737
import org.apache.hadoop.hbase.TableName;
38+
import org.apache.hadoop.hbase.TableNotEnabledException;
3839
import org.apache.hadoop.hbase.UnknownScannerException;
3940
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
4041
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@@ -126,9 +127,18 @@ protected final HRegionLocation getLocationForReplica(RegionLocations locs)
126127
return loc;
127128
}
128129

129-
protected final RegionLocations getRegionLocations(boolean reload, byte[] row)
130+
/**
131+
* Fetch region locations for the row. Since this is for prepare, we always useCache.
132+
* This is because we can be sure that RpcRetryingCaller will have cleared the cache
133+
* in error handling if this is a retry.
134+
*
135+
* @param row the row to look up region location for
136+
*/
137+
protected final RegionLocations getRegionLocationsForPrepare(byte[] row)
130138
throws IOException {
131-
return RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id, getConnection(),
139+
// always use cache, because cache will have been cleared if necessary
140+
// in the try/catch before retrying
141+
return RpcRetryingCallerWithReadReplicas.getRegionLocations(true, id, getConnection(),
132142
getTableName(), row);
133143
}
134144

@@ -140,7 +150,13 @@ public void prepare(boolean reload) throws IOException {
140150
if (Thread.interrupted()) {
141151
throw new InterruptedIOException();
142152
}
143-
RegionLocations rl = getRegionLocations(reload, getRow());
153+
154+
if (reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
155+
&& getConnection().isTableDisabled(getTableName())) {
156+
throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
157+
}
158+
159+
RegionLocations rl = getRegionLocationsForPrepare(getRow());
144160
location = getLocationForReplica(rl);
145161
ServerName dest = location.getServerName();
146162
setStub(super.getConnection().getClient(dest));

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestReversedScannerCallable.java

Lines changed: 61 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,23 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import static org.junit.Assert.assertThrows;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
24+
import static org.mockito.Mockito.when;
25+
26+
import java.io.IOException;
27+
2028
import org.apache.hadoop.conf.Configuration;
2129
import org.apache.hadoop.hbase.HBaseClassTestRule;
30+
import org.apache.hadoop.hbase.HConstants;
31+
import org.apache.hadoop.hbase.HRegionInfo;
2232
import org.apache.hadoop.hbase.HRegionLocation;
2333
import org.apache.hadoop.hbase.RegionLocations;
2434
import org.apache.hadoop.hbase.ServerName;
2535
import org.apache.hadoop.hbase.TableName;
36+
import org.apache.hadoop.hbase.TableNotEnabledException;
2637
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
2738
import org.apache.hadoop.hbase.testclassification.ClientTests;
2839
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -33,7 +44,6 @@
3344
import org.junit.experimental.categories.Category;
3445
import org.junit.runner.RunWith;
3546
import org.mockito.Mock;
36-
import org.mockito.Mockito;
3747
import org.mockito.runners.MockitoJUnitRunner;
3848

3949
@RunWith(MockitoJUnitRunner.class)
@@ -44,53 +54,78 @@ public class TestReversedScannerCallable {
4454
public static final HBaseClassTestRule CLASS_RULE =
4555
HBaseClassTestRule.forClass(TestReversedScannerCallable.class);
4656

57+
private static final TableName TABLE_NAME = TableName.valueOf("TestReversedScannerCallable");
58+
59+
private static final String HOSTNAME = "localhost";
60+
private static final ServerName SERVERNAME = ServerName.valueOf(HOSTNAME, 60030, 123);
61+
private static final byte[] ROW = Bytes.toBytes("row1");
62+
private static final Scan DEFAULT_SCAN = new Scan().withStartRow(ROW, true).setReversed(true);
63+
4764
@Mock
4865
private ClusterConnection connection;
4966
@Mock
50-
private Scan scan;
51-
@Mock
5267
private RpcControllerFactory rpcFactory;
5368
@Mock
5469
private RegionLocations regionLocations;
55-
56-
private final byte[] ROW = Bytes.toBytes("row1");
70+
@Mock
71+
private HRegionLocation regionLocation;
5772

5873
@Before
5974
public void setUp() throws Exception {
60-
HRegionLocation regionLocation = Mockito.mock(HRegionLocation.class);
61-
ServerName serverName = Mockito.mock(ServerName.class);
62-
63-
Mockito.when(connection.getConfiguration()).thenReturn(new Configuration());
64-
Mockito.when(regionLocations.size()).thenReturn(1);
65-
Mockito.when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation);
66-
Mockito.when(regionLocation.getHostname()).thenReturn("localhost");
67-
Mockito.when(regionLocation.getServerName()).thenReturn(serverName);
68-
Mockito.when(scan.includeStartRow()).thenReturn(true);
69-
Mockito.when(scan.getStartRow()).thenReturn(ROW);
75+
when(connection.getConfiguration()).thenReturn(new Configuration());
76+
when(regionLocations.size()).thenReturn(1);
77+
when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation);
78+
when(regionLocation.getHostname()).thenReturn(HOSTNAME);
79+
when(regionLocation.getServerName()).thenReturn(SERVERNAME);
7080
}
7181

7282
@Test
73-
public void testPrepareDoesNotUseCache() throws Exception {
74-
TableName tableName = TableName.valueOf("MyTable");
75-
Mockito.when(connection.relocateRegion(tableName, ROW, 0)).thenReturn(regionLocations);
83+
public void testPrepareAlwaysUsesCache() throws Exception {
84+
when(connection.locateRegion(TABLE_NAME, ROW, true, true, 0))
85+
.thenReturn(regionLocations);
7686

7787
ReversedScannerCallable callable =
78-
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory, 0);
88+
new ReversedScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);
89+
callable.prepare(false);
7990
callable.prepare(true);
8091

81-
Mockito.verify(connection).relocateRegion(tableName, ROW, 0);
92+
verify(connection, times(2)).locateRegion(TABLE_NAME, ROW, true, true, 0);
8293
}
8394

8495
@Test
85-
public void testPrepareUsesCache() throws Exception {
86-
TableName tableName = TableName.valueOf("MyTable");
87-
Mockito.when(connection.locateRegion(tableName, ROW, true, true, 0))
88-
.thenReturn(regionLocations);
96+
public void testHandleDisabledTable() throws IOException {
97+
when(connection.isTableDisabled(TABLE_NAME)).thenReturn(true);
8998

9099
ReversedScannerCallable callable =
91-
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory, 0);
100+
new ReversedScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);
101+
102+
assertThrows(TableNotEnabledException.class, () -> callable.prepare(true));
103+
}
104+
105+
@Test
106+
public void testUpdateSearchKeyCacheLocation() throws IOException {
107+
byte[] regionName = RegionInfo.createRegionName(TABLE_NAME,
108+
ConnectionUtils.createCloseRowBefore(ConnectionUtils.MAX_BYTE_ARRAY), "123", false);
109+
HRegionInfo mockRegionInfo = mock(HRegionInfo.class);
110+
when(mockRegionInfo.containsRow(ConnectionUtils.MAX_BYTE_ARRAY)).thenReturn(true);
111+
when(mockRegionInfo.getEndKey()).thenReturn(HConstants.EMPTY_END_ROW);
112+
when(mockRegionInfo.getRegionName()).thenReturn(regionName);
113+
when(regionLocation.getRegionInfo()).thenReturn(mockRegionInfo);
114+
115+
IOException testThrowable = new IOException("test throwable");
116+
117+
when(connection.locateRegion(TABLE_NAME, ConnectionUtils.MAX_BYTE_ARRAY, true, true, 0))
118+
.thenReturn(regionLocations);
119+
120+
Scan scan = new Scan().setReversed(true);
121+
ReversedScannerCallable callable =
122+
new ReversedScannerCallable(connection, TABLE_NAME, scan, null, rpcFactory, 0);
123+
92124
callable.prepare(false);
93125

94-
Mockito.verify(connection).locateRegion(tableName, ROW, true, true, 0);
126+
callable.throwable(testThrowable, true);
127+
128+
verify(connection).updateCachedLocations(TABLE_NAME, regionName,
129+
ConnectionUtils.MAX_BYTE_ARRAY, testThrowable, SERVERNAME);
95130
}
96131
}

0 commit comments

Comments
 (0)