Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2014,12 +2014,10 @@ int computeReconstructionWorkForBlocks(
final Set<Node> excludedNodes = new HashSet<>(rw.getContainingNodes());

// Exclude all nodes which already exists as targets for the block
List<DatanodeStorageInfo> targets =
Collection<DatanodeStorageInfo> targets =
pendingReconstruction.getTargets(rw.getBlock());
if (targets != null) {
for (DatanodeStorageInfo dn : targets) {
excludedNodes.add(dn.getDatanodeDescriptor());
}
for (DatanodeStorageInfo dn : targets) {
excludedNodes.add(dn.getDatanodeDescriptor());
}

// choose replication targets: NOT HOLDING THE GLOBAL LOCK
Expand Down Expand Up @@ -2507,7 +2505,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
* and put them back into the neededReconstruction queue
*/
void processPendingReconstructions() {
BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
BlockInfo[] timedOutItems = pendingReconstruction.clearTimedOutBlocks();
if (timedOutItems != null) {
namesystem.writeLock();
try {
Expand Down Expand Up @@ -4588,7 +4586,7 @@ public void removeBlock(BlockInfo block) {
PendingBlockInfo remove = pendingReconstruction.remove(block);
if (remove != null) {
DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
.toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
.toArray(new DatanodeStorageInfo[0]));
}
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
postponedMisreplicatedBlocks.remove(block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@
import static org.apache.hadoop.util.Time.monotonicNow;

import java.io.PrintWriter;
import java.sql.Time;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.Block;
Expand All @@ -49,26 +56,24 @@
class PendingReconstructionBlocks {
private static final Logger LOG = BlockManager.LOG;

private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000;

private final Map<BlockInfo, PendingBlockInfo> pendingReconstructions;
private final ArrayList<BlockInfo> timedOutItems;
private final List<BlockInfo> timedOutItems;
Daemon timerThread = null;
private volatile boolean fsRunning = true;
private long timedOutCount = 0L;

//
// It might take anywhere between 5 to 10 minutes before
// a request is timed out.
//
private long timeout =
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT * 1000;
private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000;
private final long timeout;

PendingReconstructionBlocks(long timeoutPeriod) {
if ( timeoutPeriod > 0 ) {
this.timeout = timeoutPeriod;
}
pendingReconstructions = new HashMap<>();
timedOutItems = new ArrayList<>();
this.timeout = (timeoutPeriod > 0) ? timeoutPeriod
: TimeUnit.SECONDS.toMillis(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT);
this.pendingReconstructions = new ConcurrentHashMap<>();
this.timedOutItems = Collections.synchronizedList(new ArrayList<>());
}

void start() {
Expand All @@ -82,15 +87,15 @@ void start() {
* @param targets The DataNodes where replicas of the block should be placed
*/
void increment(BlockInfo block, DatanodeStorageInfo... targets) {
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found == null) {
pendingReconstructions.put(block, new PendingBlockInfo(targets));
this.pendingReconstructions.compute(block, (key, value) -> {
if (value == null) {
value = new PendingBlockInfo(targets);
} else {
found.incrementReplicas(targets);
found.setTimeStamp();
value.incrementReplicas(targets);
value.setTimeStamp();
}
}
return value;
});
}

/**
Expand All @@ -102,19 +107,17 @@ void increment(BlockInfo block, DatanodeStorageInfo... targets) {
* @return true if the block is decremented to 0 and got removed.
*/
boolean decrement(BlockInfo block, DatanodeStorageInfo dn) {
boolean removed = false;
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found != null) {
LOG.debug("Removing pending reconstruction for {}", block);
found.decrementReplicas(dn);
if (found.getNumReplicas() <= 0) {
pendingReconstructions.remove(block);
removed = true;
}
final boolean[] found = { false };
pendingReconstructions.computeIfPresent(block, (key, value) -> {
LOG.debug("Removing pending reconstruction for {}", key);
value.decrementReplicas(dn);
if (value.getNumReplicas() <= 0) {
found[0] = true;
return null;
}
}
return removed;
return value;
});
return found[0];
}

/**
Expand All @@ -125,70 +128,54 @@ boolean decrement(BlockInfo block, DatanodeStorageInfo dn) {
* removed
*/
PendingBlockInfo remove(BlockInfo block) {
synchronized (pendingReconstructions) {
return pendingReconstructions.remove(block);
}
return pendingReconstructions.remove(block);
}

public void clear() {
synchronized (pendingReconstructions) {
pendingReconstructions.clear();
synchronized (timedOutItems) {
timedOutItems.clear();
}
timedOutItems.clear();
timedOutCount = 0L;
}
}

/**
* The total number of blocks that are undergoing reconstruction.
*/
int size() {
synchronized (pendingReconstructions) {
return pendingReconstructions.size();
}
return pendingReconstructions.size();
}

/**
* How many copies of this block is pending reconstruction?.
*/
int getNumReplicas(BlockInfo block) {
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found != null) {
return found.getNumReplicas();
}
}
return 0;
PendingBlockInfo found = pendingReconstructions.get(block);
return (found == null) ? 0 : found.getNumReplicas();
}

/**
* Used for metrics.
* @return The number of timeouts
*/
long getNumTimedOuts() {
synchronized (timedOutItems) {
return timedOutCount + timedOutItems.size();
}
return timedOutCount + timedOutItems.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlikely but this can still be technically be inconsistent, right?

}

/**
* Returns a list of blocks that have timed out their
* reconstruction requests. Returns null if no blocks have
* timed out.
* Clears the list of blocks that have timed out their reconstruction
* requests. Returns null if no blocks have timed out.
*
* @return an array of the blocks that were cleared from the list
*/
BlockInfo[] getTimedOutBlocks() {
BlockInfo[] clearTimedOutBlocks() {
BlockInfo[] blockList = null;
synchronized (timedOutItems) {
if (timedOutItems.size() <= 0) {
return null;
}
int size = timedOutItems.size();
BlockInfo[] blockList = timedOutItems.toArray(
new BlockInfo[size]);
if (!timedOutItems.isEmpty()) {
blockList = timedOutItems.toArray(new BlockInfo[0]);
timedOutCount += timedOutItems.size();
timedOutItems.clear();
timedOutCount += size;
return blockList;
}
}
return blockList;
}

/**
Expand All @@ -200,12 +187,11 @@ BlockInfo[] getTimedOutBlocks() {
*/
static class PendingBlockInfo {
private long timeStamp;
private final List<DatanodeStorageInfo> targets;
private final Set<DatanodeStorageInfo> targets;

PendingBlockInfo(DatanodeStorageInfo[] targets) {
this.timeStamp = monotonicNow();
this.targets = targets == null ? new ArrayList<DatanodeStorageInfo>()
: new ArrayList<>(Arrays.asList(targets));
this.targets = new HashSet<>(Arrays.asList(targets));
}

long getTimeStamp() {
Expand All @@ -217,20 +203,16 @@ void setTimeStamp() {
}

void incrementReplicas(DatanodeStorageInfo... newTargets) {
if (newTargets != null) {
for (DatanodeStorageInfo newTarget : newTargets) {
if (!targets.contains(newTarget)) {
targets.add(newTarget);
}
}
for (DatanodeStorageInfo newTarget : newTargets) {
targets.add(newTarget);
}
}

void decrementReplicas(DatanodeStorageInfo dn) {
Iterator<DatanodeStorageInfo> iterator = targets.iterator();
while (iterator.hasNext()) {
DatanodeStorageInfo next = iterator.next();
if (next.getDatanodeDescriptor() == dn.getDatanodeDescriptor()) {
if (next.getDatanodeDescriptor().equals(dn.getDatanodeDescriptor())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty bad... could this have had a negative impact in the past?

iterator.remove();
}
}
Expand All @@ -240,7 +222,7 @@ int getNumReplicas() {
return targets.size();
}

List<DatanodeStorageInfo> getTargets() {
Collection<DatanodeStorageInfo> getTargets() {
return targets;
}
}
Expand All @@ -252,13 +234,14 @@ List<DatanodeStorageInfo> getTargets() {
class PendingReconstructionMonitor implements Runnable {
@Override
public void run() {
while (fsRunning) {
while (true) {
long period = Math.min(DEFAULT_RECHECK_INTERVAL, timeout);
try {
pendingReconstructionCheck();
Thread.sleep(period);
} catch (InterruptedException ie) {
LOG.debug("PendingReconstructionMonitor thread is interrupted.", ie);
LOG.debug("PendingReconstructionMonitor thread is interrupted", ie);
return;
}
}
}
Expand All @@ -267,25 +250,21 @@ public void run() {
* Iterate through all items and detect timed-out items
*/
void pendingReconstructionCheck() {
synchronized (pendingReconstructions) {
Iterator<Map.Entry<BlockInfo, PendingBlockInfo>> iter =
pendingReconstructions.entrySet().iterator();
long now = monotonicNow();
final long now = monotonicNow();
LOG.debug("PendingReconstructionMonitor checking Q");
while (iter.hasNext()) {
Map.Entry<BlockInfo, PendingBlockInfo> entry = iter.next();
PendingBlockInfo pendingBlock = entry.getValue();
if (now > pendingBlock.getTimeStamp() + timeout) {
BlockInfo block = entry.getKey();
synchronized (timedOutItems) {
timedOutItems.add(block);
}
LOG.warn("PendingReconstructionMonitor timed out " + block);
timedOutItems.add(block);
NameNode.getNameNodeMetrics().incTimeoutReReplications();
iter.remove();
}
}
}
}
}

Expand All @@ -296,46 +275,42 @@ void pendingReconstructionCheck() {
public Daemon getTimerThread() {
return timerThread;
}
/*
* Shuts down the pending reconstruction monitor thread.
* Waits for the thread to exit.

/**
* Shuts down the pending reconstruction monitor thread. Waits for the thread
* to exit.
*/
void stop() {
fsRunning = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can of like this. You think catching the interrupt is enough?

if(timerThread == null) return;
if (timerThread == null)
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add keys?

timerThread.interrupt();
try {
timerThread.join(3000);
} catch (InterruptedException ie) {
LOG.debug("PendingReconstructionMonitor stop is interrupted", ie);
}
}

/**
* Iterate through all items and print them.
*/
void metaSave(PrintWriter out) {
synchronized (pendingReconstructions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there no need anymore?

out.println("Metasave: Blocks being reconstructed: " +
pendingReconstructions.size());
for (Map.Entry<BlockInfo, PendingBlockInfo> entry :
pendingReconstructions.entrySet()) {
PendingBlockInfo pendingBlock = entry.getValue();
Block block = entry.getKey();
out.println(block +
" StartTime: " + new Time(pendingBlock.timeStamp) +
" NumReconstructInProgress: " +
pendingBlock.getNumReplicas());
}
Set<Entry<BlockInfo, PendingBlockInfo>> entrySet =
pendingReconstructions.entrySet();
out.println("Metasave: Blocks being reconstructed: " + entrySet.size());
for (Map.Entry<BlockInfo, PendingBlockInfo> entry : entrySet) {
PendingBlockInfo pendingBlock = entry.getValue();
Block block = entry.getKey();
out.print(block);
out.print(" StartTime: " + DateTimeFormatter.ISO_INSTANT
.format(Instant.ofEpochMilli(pendingBlock.timeStamp)));
out.print(" NumReconstructInProgress: " + pendingBlock.getNumReplicas());
out.println();
}
}

List<DatanodeStorageInfo> getTargets(BlockInfo block) {
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found != null) {
return found.targets;
}
}
return null;
Collection<DatanodeStorageInfo> getTargets(BlockInfo block) {
PendingBlockInfo found = pendingReconstructions.get(block);
return (found == null) ? Collections.emptySet() : found.targets;
}
}
Loading