Skip to content

Commit 215c726

Browse files
committed
HBASE-23093 : Avoid Optional Anti-Pattern where possible
1 parent 1170f28 commit 215c726

23 files changed

+244
-221
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java

Lines changed: 63 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ private static Optional<TableState> getTableState(Result r) throws IOException {
176176
* {@link CompletableFuture}.
177177
*/
178178
public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
179-
AsyncTable<AdvancedScanResultConsumer> metaTable, Optional<TableName> tableName) {
179+
AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
180180
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
181181
addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
182182
if (err != null) {
@@ -202,37 +202,39 @@ public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
202202
* {@link CompletableFuture}.
203203
*/
204204
private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
205-
AsyncTable<AdvancedScanResultConsumer> metaTable, final Optional<TableName> tableName,
206-
final boolean excludeOfflinedSplitParents) {
205+
final AsyncTable<AdvancedScanResultConsumer> metaTable,
206+
final TableName tableName, final boolean excludeOfflinedSplitParents) {
207207
CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
208-
if (tableName.filter((t) -> t.equals(TableName.META_TABLE_NAME)).isPresent()) {
208+
if (TableName.META_TABLE_NAME.equals(tableName)) {
209209
future.completeExceptionally(new IOException(
210210
"This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
211211
}
212212

213213
// Make a version of CollectingVisitor that collects RegionInfo and ServerAddress
214-
CollectingVisitor<Pair<RegionInfo, ServerName>> visitor = new CollectingVisitor<Pair<RegionInfo, ServerName>>() {
215-
private Optional<RegionLocations> current = null;
214+
CollectingVisitor<Pair<RegionInfo, ServerName>> visitor =
215+
new CollectingVisitor<Pair<RegionInfo, ServerName>>() {
216+
private RegionLocations current = null;
216217

217218
@Override
218219
public boolean visit(Result r) throws IOException {
219-
current = getRegionLocations(r);
220-
if (!current.isPresent() || current.get().getRegionLocation().getRegion() == null) {
220+
Optional<RegionLocations> currentRegionLocations = getRegionLocations(r);
221+
current = currentRegionLocations.orElse(null);
222+
if (current == null || current.getRegionLocation().getRegion() == null) {
221223
LOG.warn("No serialized RegionInfo in " + r);
222224
return true;
223225
}
224-
RegionInfo hri = current.get().getRegionLocation().getRegion();
226+
RegionInfo hri = current.getRegionLocation().getRegion();
225227
if (excludeOfflinedSplitParents && hri.isSplitParent()) return true;
226228
// Else call super and add this Result to the collection.
227229
return super.visit(r);
228230
}
229231

230232
@Override
231233
void add(Result r) {
232-
if (!current.isPresent()) {
234+
if (current == null) {
233235
return;
234236
}
235-
for (HRegionLocation loc : current.get().getRegionLocations()) {
237+
for (HRegionLocation loc : current.getRegionLocations()) {
236238
if (loc != null) {
237239
this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc
238240
.getServerName()));
@@ -259,7 +261,7 @@ void add(Result r) {
259261
* @param visitor Visitor invoked against each row
260262
*/
261263
private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
262-
Optional<TableName> tableName, QueryType type, final Visitor visitor) {
264+
TableName tableName, QueryType type, final Visitor visitor) {
263265
return scanMeta(metaTable, getTableStartRowForMeta(tableName, type),
264266
getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor);
265267
}
@@ -274,15 +276,18 @@ private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultCon
274276
* @param visitor Visitor invoked against each row
275277
*/
276278
private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
277-
Optional<byte[]> startRow, Optional<byte[]> stopRow, QueryType type, int maxRows,
278-
final Visitor visitor) {
279+
byte[] startRow, byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) {
279280
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
280281
Scan scan = getMetaScan(metaTable, rowUpperLimit);
281282
for (byte[] family : type.getFamilies()) {
282283
scan.addFamily(family);
283284
}
284-
startRow.ifPresent(scan::withStartRow);
285-
stopRow.ifPresent(scan::withStopRow);
285+
if (startRow != null) {
286+
scan.withStartRow(startRow);
287+
}
288+
if (stopRow != null) {
289+
scan.withStopRow(stopRow);
290+
}
286291

287292
if (LOG.isDebugEnabled()) {
288293
LOG.debug("Scanning META" + " starting at row=" + Bytes.toStringBinary(scan.getStartRow())
@@ -466,52 +471,56 @@ private static long getSeqNumDuringOpen(final Result r, final int replicaId) {
466471
* @param tableName table we're working with
467472
* @return start row for scanning META according to query type
468473
*/
469-
private static Optional<byte[]> getTableStartRowForMeta(Optional<TableName> tableName,
470-
QueryType type) {
471-
return tableName.map((table) -> {
472-
switch (type) {
473-
case REGION:
474-
case REPLICATION:
475-
byte[] startRow = new byte[table.getName().length + 2];
476-
System.arraycopy(table.getName(), 0, startRow, 0, table.getName().length);
477-
startRow[startRow.length - 2] = HConstants.DELIMITER;
478-
startRow[startRow.length - 1] = HConstants.DELIMITER;
479-
return startRow;
480-
case ALL:
481-
case TABLE:
482-
default:
483-
return table.getName();
474+
private static byte[] getTableStartRowForMeta(TableName tableName, QueryType type) {
475+
if (tableName == null) {
476+
return null;
477+
}
478+
switch (type) {
479+
case REGION:
480+
case REPLICATION: {
481+
byte[] startRow = new byte[tableName.getName().length + 2];
482+
System.arraycopy(tableName.getName(), 0, startRow, 0, tableName.getName().length);
483+
startRow[startRow.length - 2] = HConstants.DELIMITER;
484+
startRow[startRow.length - 1] = HConstants.DELIMITER;
485+
return startRow;
484486
}
485-
});
487+
case ALL:
488+
case TABLE:
489+
default: {
490+
return tableName.getName();
491+
}
492+
}
486493
}
487494

488495
/**
489496
* @param tableName table we're working with
490497
* @return stop row for scanning META according to query type
491498
*/
492-
private static Optional<byte[]> getTableStopRowForMeta(Optional<TableName> tableName,
493-
QueryType type) {
494-
return tableName.map((table) -> {
495-
final byte[] stopRow;
496-
switch (type) {
497-
case REGION:
498-
case REPLICATION:
499-
stopRow = new byte[table.getName().length + 3];
500-
System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
501-
stopRow[stopRow.length - 3] = ' ';
502-
stopRow[stopRow.length - 2] = HConstants.DELIMITER;
503-
stopRow[stopRow.length - 1] = HConstants.DELIMITER;
504-
break;
505-
case ALL:
506-
case TABLE:
507-
default:
508-
stopRow = new byte[table.getName().length + 1];
509-
System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
510-
stopRow[stopRow.length - 1] = ' ';
511-
break;
499+
private static byte[] getTableStopRowForMeta(TableName tableName, QueryType type) {
500+
if (tableName == null) {
501+
return null;
502+
}
503+
final byte[] stopRow;
504+
switch (type) {
505+
case REGION:
506+
case REPLICATION: {
507+
stopRow = new byte[tableName.getName().length + 3];
508+
System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
509+
stopRow[stopRow.length - 3] = ' ';
510+
stopRow[stopRow.length - 2] = HConstants.DELIMITER;
511+
stopRow[stopRow.length - 1] = HConstants.DELIMITER;
512+
break;
512513
}
513-
return stopRow;
514-
});
514+
case ALL:
515+
case TABLE:
516+
default: {
517+
stopRow = new byte[tableName.getName().length + 1];
518+
System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
519+
stopRow[stopRow.length - 1] = ' ';
520+
break;
521+
}
522+
}
523+
return stopRow;
515524
}
516525

517526
/**

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,8 @@ private void onComplete(Action action, RegionRequest regionReq, int tries, Serve
307307

308308
private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
309309
ServerName serverName, MultiResponse resp) {
310-
ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
311-
serverName, resp);
310+
ConnectionUtils.updateStats(conn.getStatisticsTracker(),
311+
conn.getConnectionMetrics().orElse(null), serverName, resp);
312312
List<Action> failedActions = new ArrayList<>();
313313
MutableBoolean retryImmediately = new MutableBoolean(false);
314314
actionsByRegion.forEach((rn, regionReq) -> {
@@ -400,18 +400,17 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr
400400
// based on the load of the region server and the region.
401401
private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) {
402402
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
403-
Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker();
404-
if (!optStats.isPresent()) {
403+
ServerStatisticTracker optStats = conn.getStatisticsTracker();
404+
if (optStats == null) {
405405
actionsByServer.forEach((serverName, serverReq) -> {
406406
metrics.ifPresent(MetricsConnection::incrNormalRunners);
407407
sendToServer(serverName, serverReq, tries);
408408
});
409409
return;
410410
}
411-
ServerStatisticTracker stats = optStats.get();
412411
ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
413412
actionsByServer.forEach((serverName, serverReq) -> {
414-
ServerStatistics serverStats = stats.getStats(serverName);
413+
ServerStatistics serverStats = optStats.getStats(serverName);
415414
Map<Long, ServerRequest> groupByBackoff = new HashMap<>();
416415
serverReq.actionsByRegion.forEach((regionName, regionReq) -> {
417416
long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats);

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ private void openScanner() {
204204
openScannerTries.set(1);
205205
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
206206
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
207-
conn.getConnectionMetrics()), (resp, error) -> {
207+
conn.getConnectionMetrics().orElse(null)), (resp, error) -> {
208208
if (error != null) {
209209
consumer.onError(error);
210210
return;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class AsyncConnectionImpl implements AsyncConnection {
109109
private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
110110
new AtomicReference<>();
111111

112-
private final Optional<ServerStatisticTracker> stats;
112+
private final ServerStatisticTracker stats;
113113
private final ClientBackoffPolicy backoffPolicy;
114114

115115
private ChoreService authService;
@@ -148,7 +148,7 @@ public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String cl
148148
} else {
149149
nonceGenerator = NO_NONCE_GENERATOR;
150150
}
151-
this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
151+
this.stats = ServerStatisticTracker.create(conf);
152152
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
153153
ClusterStatusListener listener = null;
154154
if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
@@ -281,7 +281,7 @@ void clearMasterStubCache(MasterService.Interface stub) {
281281
masterStub.compareAndSet(stub, null);
282282
}
283283

284-
Optional<ServerStatisticTracker> getStatisticsTracker() {
284+
ServerStatisticTracker getStatisticsTracker() {
285285
return stats;
286286
}
287287

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ private void removeLocationFromCache(HRegionLocation loc) {
110110

111111
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
112112
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
113-
this::addLocationToCache, this::removeLocationFromCache, Optional.empty());
113+
this::addLocationToCache, this::removeLocationFromCache);
114114
}
115115

116116
void clearCache() {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public Optional<LocateRequest> getCandidate() {
145145
return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
146146
}
147147

148-
public void clearCompletedRequests(Optional<RegionLocations> locations) {
148+
public void clearCompletedRequests(RegionLocations locations) {
149149
for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
150150
allRequests.entrySet().iterator(); iter.hasNext();) {
151151
Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
@@ -156,15 +156,14 @@ public void clearCompletedRequests(Optional<RegionLocations> locations) {
156156
}
157157

158158
private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
159-
Optional<RegionLocations> locations) {
159+
RegionLocations locations) {
160160
if (future.isDone()) {
161161
return true;
162162
}
163-
if (!locations.isPresent()) {
163+
if (locations == null) {
164164
return false;
165165
}
166-
RegionLocations locs = locations.get();
167-
HRegionLocation loc = ObjectUtils.firstNonNull(locs.getRegionLocations());
166+
HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
168167
// we should at least have one location available, otherwise the request should fail and
169168
// should not arrive here
170169
assert loc != null;
@@ -183,7 +182,7 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations
183182
completed = loc.getRegion().containsRow(req.row);
184183
}
185184
if (completed) {
186-
future.complete(locs);
185+
future.complete(locations);
187186
return true;
188187
} else {
189188
return false;
@@ -286,7 +285,7 @@ private void complete(TableName tableName, LocateRequest req, RegionLocations lo
286285
RegionLocations addedLocs = addToCache(tableCache, locs);
287286
synchronized (tableCache) {
288287
tableCache.pendingRequests.remove(req);
289-
tableCache.clearCompletedRequests(Optional.of(addedLocs));
288+
tableCache.clearCompletedRequests(addedLocs);
290289
// Remove a complete locate request in a synchronized block, so the table cache must have
291290
// quota to send a candidate request.
292291
toSend = tableCache.getCandidate();
@@ -304,7 +303,7 @@ private void complete(TableName tableName, LocateRequest req, RegionLocations lo
304303
if (future != null) {
305304
future.completeExceptionally(error);
306305
}
307-
tableCache.clearCompletedRequests(Optional.empty());
306+
tableCache.clearCompletedRequests(null);
308307
// Remove a complete locate request in a synchronized block, so the table cache must have
309308
// quota to send a candidate request.
310309
toSend = tableCache.getCandidate();
@@ -491,7 +490,7 @@ public void onNext(Result[] results, ScanController controller) {
491490
}
492491
RegionLocations addedLocs = addToCache(tableCache, locs);
493492
synchronized (tableCache) {
494-
tableCache.clearCompletedRequests(Optional.of(addedLocs));
493+
tableCache.clearCompletedRequests(addedLocs);
495494
}
496495
}
497496
}
@@ -604,8 +603,9 @@ private HRegionLocation getCachedLocation(HRegionLocation loc) {
604603
}
605604

606605
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
606+
Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics();
607607
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
608-
this::addLocationToCache, this::removeLocationFromCache, conn.getConnectionMetrics());
608+
this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null));
609609
}
610610

611611
void clearCache(TableName tableName) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,17 @@ static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) {
5050
oldLoc.getServerName().equals(loc.getServerName());
5151
}
5252

53+
static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
54+
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
55+
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache) {
56+
updateCachedLocationOnError(loc, exception, cachedLocationSupplier, addToCache,
57+
removeFromCache, null);
58+
}
59+
5360
static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
5461
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
5562
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
56-
Optional<MetricsConnection> metrics) {
63+
MetricsConnection metrics) {
5764
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
5865
if (LOG.isDebugEnabled()) {
5966
LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
@@ -81,7 +88,9 @@ static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception
8188
addToCache.accept(newLoc);
8289
} else {
8390
LOG.debug("Try removing {} from cache", loc);
84-
metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception));
91+
if (metrics != null) {
92+
metrics.incrCacheDroppingExceptions(exception);
93+
}
8594
removeFromCache.accept(loc);
8695
}
8796
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
6060
.thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
6161
}
6262
return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME),
63-
Optional.of(tableName));
63+
tableName);
6464
}
6565

6666
@Override

0 commit comments

Comments
 (0)