Skip to content

Commit

Permalink
HDFS-16163. Avoid locking entire blockPinningFailures map
Browse files Browse the repository at this point in the history
  • Loading branch information
virajjasani committed Aug 11, 2021
1 parent 77383a4 commit 02a3612
Showing 1 changed file with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -405,7 +406,8 @@ private void dispatch() {
// Pinned block can't be moved. Add this block into failure list.
// Later in the next iteration mover will exclude these blocks from
// pending moves.
target.getDDatanode().addBlockPinningFailures(this);
target.getDDatanode().addBlockPinningFailures(
this.reportedBlock.getBlock().getBlockId(), this.getSource());
return;
}

Expand Down Expand Up @@ -643,7 +645,8 @@ public boolean equals(Object obj) {
/** blocks being moved but not confirmed yet */
private final List<PendingMove> pendings;
private volatile boolean hasFailure = false;
private Map<Long, Set<DatanodeInfo>> blockPinningFailures = new HashMap<>();
private final Map<Long, Set<DatanodeInfo>> blockPinningFailures =
new ConcurrentHashMap<>();
private volatile boolean hasSuccess = false;
private ExecutorService moveExecutor;

Expand Down Expand Up @@ -729,16 +732,17 @@ void setHasFailure() {
this.hasFailure = true;
}

void addBlockPinningFailures(PendingMove pendingBlock) {
synchronized (blockPinningFailures) {
long blockId = pendingBlock.reportedBlock.getBlock().getBlockId();
Set<DatanodeInfo> pinnedLocations = blockPinningFailures.get(blockId);
private void addBlockPinningFailures(long blockId, DatanodeInfo source) {
blockPinningFailures.compute(blockId, (key, pinnedLocations) -> {
Set<DatanodeInfo> newPinnedLocations;
if (pinnedLocations == null) {
pinnedLocations = new HashSet<>();
blockPinningFailures.put(blockId, pinnedLocations);
newPinnedLocations = new HashSet<>();
} else {
newPinnedLocations = pinnedLocations;
}
pinnedLocations.add(pendingBlock.getSource());
}
newPinnedLocations.add(source);
return newPinnedLocations;
});
}

Map<Long, Set<DatanodeInfo>> getBlockPinningFailureList() {
Expand Down

0 comments on commit 02a3612

Please sign in to comment.