Skip to content

HBASE-27093 AsyncNonMetaRegionLocator:put Complete CompletableFuture … #4496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -122,6 +124,26 @@ public boolean equals(Object obj) {
}
}

private static final class RegionLocationsFutureResult {
private final CompletableFuture<RegionLocations> future;
private final RegionLocations result;
private final Throwable e;

public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future,
RegionLocations result, Throwable e) {
this.future = future;
this.result = result;
this.e = e;
}

public void complete() {
if (e != null) {
future.completeExceptionally(e);
}
future.complete(result);
}
}

private static final class TableCache {

private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
Expand All @@ -148,18 +170,20 @@ public Optional<LocateRequest> getCandidate() {
return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
}

public void clearCompletedRequests(RegionLocations locations) {
public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) {
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
allRequests.entrySet().iterator(); iter.hasNext();) {
Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) {
iter.remove();
}
}
return futureResultList;
}

private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
RegionLocations locations) {
RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) {
if (future.isDone()) {
return true;
}
Expand All @@ -185,7 +209,7 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations
completed = loc.getRegion().containsRow(req.row);
}
if (completed) {
future.complete(locations);
futureResultList.add(new RegionLocationsFutureResult(future, locations, null));
return true;
} else {
return false;
Expand Down Expand Up @@ -319,32 +343,36 @@ private void complete(TableName tableName, LocateRequest req, RegionLocations lo
TableCache tableCache = getTableCache(tableName);
if (locs != null) {
RegionLocations addedLocs = addToCache(tableCache, locs);
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
tableCache.clearCompletedRequests(addedLocs);
futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
toSend.ifPresent(r -> tableCache.send(r));
}
futureResultList.forEach(RegionLocationsFutureResult::complete);
toSend.ifPresent(r -> locateInMeta(tableName, r));
} else {
// we meet an error
assert error != null;
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
// fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
// already retried several times
CompletableFuture<?> future = tableCache.allRequests.remove(req);
CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req);
if (future != null) {
future.completeExceptionally(error);
futureResultList.add(new RegionLocationsFutureResult(future, null, error));
}
tableCache.clearCompletedRequests(null);
futureResultList.addAll(tableCache.clearCompletedRequests(null));
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
toSend.ifPresent(r -> tableCache.send(r));
}
futureResultList.forEach(RegionLocationsFutureResult::complete);
toSend.ifPresent(r -> locateInMeta(tableName, r));
}
}
Expand Down Expand Up @@ -542,9 +570,11 @@ public void onNext(Result[] results, ScanController controller) {
continue;
}
RegionLocations addedLocs = addToCache(tableCache, locs);
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.clearCompletedRequests(addedLocs);
futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
}
futureResultList.forEach(RegionLocationsFutureResult::complete);
}
}
}
Expand Down Expand Up @@ -676,12 +706,16 @@ void clearCache(TableName tableName) {
if (tableCache == null) {
return;
}
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
if (!tableCache.allRequests.isEmpty()) {
IOException error = new IOException("Cache cleared");
tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
tableCache.allRequests.values().forEach(f -> {
futureResultList.add(new RegionLocationsFutureResult(f, null, error));
});
}
}
futureResultList.forEach(RegionLocationsFutureResult::complete);
conn.getConnectionMetrics()
.ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
}
Expand Down