-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HDFS-15393: Review of PendingReconstructionBlocks #2055
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,13 +21,20 @@ | |
| import static org.apache.hadoop.util.Time.monotonicNow; | ||
|
|
||
| import java.io.PrintWriter; | ||
| import java.sql.Time; | ||
| import java.time.Instant; | ||
| import java.time.format.DateTimeFormatter; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.HashMap; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Map.Entry; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import org.apache.hadoop.hdfs.protocol.Block; | ||
|
|
@@ -49,26 +56,24 @@ | |
| class PendingReconstructionBlocks { | ||
| private static final Logger LOG = BlockManager.LOG; | ||
|
|
||
| private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000; | ||
|
|
||
| private final Map<BlockInfo, PendingBlockInfo> pendingReconstructions; | ||
| private final ArrayList<BlockInfo> timedOutItems; | ||
| private final List<BlockInfo> timedOutItems; | ||
| Daemon timerThread = null; | ||
| private volatile boolean fsRunning = true; | ||
| private long timedOutCount = 0L; | ||
|
|
||
| // | ||
| // It might take anywhere between 5 to 10 minutes before | ||
| // a request is timed out. | ||
| // | ||
| private long timeout = | ||
| DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT * 1000; | ||
| private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000; | ||
| private final long timeout; | ||
|
|
||
| PendingReconstructionBlocks(long timeoutPeriod) { | ||
| if ( timeoutPeriod > 0 ) { | ||
| this.timeout = timeoutPeriod; | ||
| } | ||
| pendingReconstructions = new HashMap<>(); | ||
| timedOutItems = new ArrayList<>(); | ||
| this.timeout = (timeoutPeriod > 0) ? timeoutPeriod | ||
| : TimeUnit.SECONDS.toMillis(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT); | ||
| this.pendingReconstructions = new ConcurrentHashMap<>(); | ||
| this.timedOutItems = Collections.synchronizedList(new ArrayList<>()); | ||
| } | ||
|
|
||
| void start() { | ||
|
|
@@ -82,15 +87,15 @@ void start() { | |
| * @param targets The DataNodes where replicas of the block should be placed | ||
| */ | ||
| void increment(BlockInfo block, DatanodeStorageInfo... targets) { | ||
| synchronized (pendingReconstructions) { | ||
| PendingBlockInfo found = pendingReconstructions.get(block); | ||
| if (found == null) { | ||
| pendingReconstructions.put(block, new PendingBlockInfo(targets)); | ||
| this.pendingReconstructions.compute(block, (key, value) -> { | ||
| if (value == null) { | ||
| value = new PendingBlockInfo(targets); | ||
| } else { | ||
| found.incrementReplicas(targets); | ||
| found.setTimeStamp(); | ||
| value.incrementReplicas(targets); | ||
| value.setTimeStamp(); | ||
| } | ||
| } | ||
| return value; | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -102,19 +107,17 @@ void increment(BlockInfo block, DatanodeStorageInfo... targets) { | |
| * @return true if the block is decremented to 0 and got removed. | ||
| */ | ||
| boolean decrement(BlockInfo block, DatanodeStorageInfo dn) { | ||
| boolean removed = false; | ||
| synchronized (pendingReconstructions) { | ||
| PendingBlockInfo found = pendingReconstructions.get(block); | ||
| if (found != null) { | ||
| LOG.debug("Removing pending reconstruction for {}", block); | ||
| found.decrementReplicas(dn); | ||
| if (found.getNumReplicas() <= 0) { | ||
| pendingReconstructions.remove(block); | ||
| removed = true; | ||
| } | ||
| final boolean[] found = { false }; | ||
| pendingReconstructions.computeIfPresent(block, (key, value) -> { | ||
| LOG.debug("Removing pending reconstruction for {}", key); | ||
| value.decrementReplicas(dn); | ||
| if (value.getNumReplicas() <= 0) { | ||
| found[0] = true; | ||
| return null; | ||
| } | ||
| } | ||
| return removed; | ||
| return value; | ||
| }); | ||
| return found[0]; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -125,70 +128,54 @@ boolean decrement(BlockInfo block, DatanodeStorageInfo dn) { | |
| * removed | ||
| */ | ||
| PendingBlockInfo remove(BlockInfo block) { | ||
| synchronized (pendingReconstructions) { | ||
| return pendingReconstructions.remove(block); | ||
| } | ||
| return pendingReconstructions.remove(block); | ||
| } | ||
|
|
||
| public void clear() { | ||
| synchronized (pendingReconstructions) { | ||
| pendingReconstructions.clear(); | ||
| synchronized (timedOutItems) { | ||
| timedOutItems.clear(); | ||
| } | ||
| timedOutItems.clear(); | ||
| timedOutCount = 0L; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * The total number of blocks that are undergoing reconstruction. | ||
| */ | ||
| int size() { | ||
| synchronized (pendingReconstructions) { | ||
| return pendingReconstructions.size(); | ||
| } | ||
| return pendingReconstructions.size(); | ||
| } | ||
|
|
||
| /** | ||
| * How many copies of this block is pending reconstruction?. | ||
| */ | ||
| int getNumReplicas(BlockInfo block) { | ||
| synchronized (pendingReconstructions) { | ||
| PendingBlockInfo found = pendingReconstructions.get(block); | ||
| if (found != null) { | ||
| return found.getNumReplicas(); | ||
| } | ||
| } | ||
| return 0; | ||
| PendingBlockInfo found = pendingReconstructions.get(block); | ||
| return (found == null) ? 0 : found.getNumReplicas(); | ||
| } | ||
|
|
||
| /** | ||
| * Used for metrics. | ||
| * @return The number of timeouts | ||
| */ | ||
| long getNumTimedOuts() { | ||
| synchronized (timedOutItems) { | ||
| return timedOutCount + timedOutItems.size(); | ||
| } | ||
| return timedOutCount + timedOutItems.size(); | ||
| } | ||
|
|
||
| /** | ||
| * Returns a list of blocks that have timed out their | ||
| * reconstruction requests. Returns null if no blocks have | ||
| * timed out. | ||
| * Clears the list of blocks that have timed out their reconstruction | ||
| * requests. Returns null if no blocks have timed out. | ||
| * | ||
| * @return an array of the blocks that were cleared from the list | ||
| */ | ||
| BlockInfo[] getTimedOutBlocks() { | ||
| BlockInfo[] clearTimedOutBlocks() { | ||
| BlockInfo[] blockList = null; | ||
| synchronized (timedOutItems) { | ||
| if (timedOutItems.size() <= 0) { | ||
| return null; | ||
| } | ||
| int size = timedOutItems.size(); | ||
| BlockInfo[] blockList = timedOutItems.toArray( | ||
| new BlockInfo[size]); | ||
| if (!timedOutItems.isEmpty()) { | ||
| blockList = timedOutItems.toArray(new BlockInfo[0]); | ||
| timedOutCount += timedOutItems.size(); | ||
| timedOutItems.clear(); | ||
| timedOutCount += size; | ||
| return blockList; | ||
| } | ||
| } | ||
| return blockList; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -200,12 +187,11 @@ BlockInfo[] getTimedOutBlocks() { | |
| */ | ||
| static class PendingBlockInfo { | ||
| private long timeStamp; | ||
| private final List<DatanodeStorageInfo> targets; | ||
| private final Set<DatanodeStorageInfo> targets; | ||
|
|
||
| PendingBlockInfo(DatanodeStorageInfo[] targets) { | ||
| this.timeStamp = monotonicNow(); | ||
| this.targets = targets == null ? new ArrayList<DatanodeStorageInfo>() | ||
| : new ArrayList<>(Arrays.asList(targets)); | ||
| this.targets = new HashSet<>(Arrays.asList(targets)); | ||
| } | ||
|
|
||
| long getTimeStamp() { | ||
|
|
@@ -217,20 +203,16 @@ void setTimeStamp() { | |
| } | ||
|
|
||
| void incrementReplicas(DatanodeStorageInfo... newTargets) { | ||
| if (newTargets != null) { | ||
| for (DatanodeStorageInfo newTarget : newTargets) { | ||
| if (!targets.contains(newTarget)) { | ||
| targets.add(newTarget); | ||
| } | ||
| } | ||
| for (DatanodeStorageInfo newTarget : newTargets) { | ||
| targets.add(newTarget); | ||
| } | ||
| } | ||
|
|
||
| void decrementReplicas(DatanodeStorageInfo dn) { | ||
| Iterator<DatanodeStorageInfo> iterator = targets.iterator(); | ||
| while (iterator.hasNext()) { | ||
| DatanodeStorageInfo next = iterator.next(); | ||
| if (next.getDatanodeDescriptor() == dn.getDatanodeDescriptor()) { | ||
| if (next.getDatanodeDescriptor().equals(dn.getDatanodeDescriptor())) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks pretty bad... could this have had a negative impact in the past? |
||
| iterator.remove(); | ||
| } | ||
| } | ||
|
|
@@ -240,7 +222,7 @@ int getNumReplicas() { | |
| return targets.size(); | ||
| } | ||
|
|
||
| List<DatanodeStorageInfo> getTargets() { | ||
| Collection<DatanodeStorageInfo> getTargets() { | ||
| return targets; | ||
| } | ||
| } | ||
|
|
@@ -252,13 +234,14 @@ List<DatanodeStorageInfo> getTargets() { | |
| class PendingReconstructionMonitor implements Runnable { | ||
| @Override | ||
| public void run() { | ||
| while (fsRunning) { | ||
| while (true) { | ||
| long period = Math.min(DEFAULT_RECHECK_INTERVAL, timeout); | ||
| try { | ||
| pendingReconstructionCheck(); | ||
| Thread.sleep(period); | ||
| } catch (InterruptedException ie) { | ||
| LOG.debug("PendingReconstructionMonitor thread is interrupted.", ie); | ||
| LOG.debug("PendingReconstructionMonitor thread is interrupted", ie); | ||
| return; | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -267,25 +250,21 @@ public void run() { | |
| * Iterate through all items and detect timed-out items | ||
| */ | ||
| void pendingReconstructionCheck() { | ||
| synchronized (pendingReconstructions) { | ||
| Iterator<Map.Entry<BlockInfo, PendingBlockInfo>> iter = | ||
| pendingReconstructions.entrySet().iterator(); | ||
| long now = monotonicNow(); | ||
| final long now = monotonicNow(); | ||
| LOG.debug("PendingReconstructionMonitor checking Q"); | ||
| while (iter.hasNext()) { | ||
| Map.Entry<BlockInfo, PendingBlockInfo> entry = iter.next(); | ||
| PendingBlockInfo pendingBlock = entry.getValue(); | ||
| if (now > pendingBlock.getTimeStamp() + timeout) { | ||
| BlockInfo block = entry.getKey(); | ||
| synchronized (timedOutItems) { | ||
| timedOutItems.add(block); | ||
| } | ||
| LOG.warn("PendingReconstructionMonitor timed out " + block); | ||
| timedOutItems.add(block); | ||
| NameNode.getNameNodeMetrics().incTimeoutReReplications(); | ||
| iter.remove(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -296,46 +275,42 @@ void pendingReconstructionCheck() { | |
| public Daemon getTimerThread() { | ||
| return timerThread; | ||
| } | ||
| /* | ||
| * Shuts down the pending reconstruction monitor thread. | ||
| * Waits for the thread to exit. | ||
|
|
||
| /** | ||
| * Shuts down the pending reconstruction monitor thread. Waits for the thread | ||
| * to exit. | ||
| */ | ||
| void stop() { | ||
| fsRunning = false; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can of like this. You think catching the interrupt is enough? |
||
| if(timerThread == null) return; | ||
| if (timerThread == null) | ||
| return; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add keys? |
||
| timerThread.interrupt(); | ||
| try { | ||
| timerThread.join(3000); | ||
| } catch (InterruptedException ie) { | ||
| LOG.debug("PendingReconstructionMonitor stop is interrupted", ie); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Iterate through all items and print them. | ||
| */ | ||
| void metaSave(PrintWriter out) { | ||
| synchronized (pendingReconstructions) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is there no need anymore? |
||
| out.println("Metasave: Blocks being reconstructed: " + | ||
| pendingReconstructions.size()); | ||
| for (Map.Entry<BlockInfo, PendingBlockInfo> entry : | ||
| pendingReconstructions.entrySet()) { | ||
| PendingBlockInfo pendingBlock = entry.getValue(); | ||
| Block block = entry.getKey(); | ||
| out.println(block + | ||
| " StartTime: " + new Time(pendingBlock.timeStamp) + | ||
| " NumReconstructInProgress: " + | ||
| pendingBlock.getNumReplicas()); | ||
| } | ||
| Set<Entry<BlockInfo, PendingBlockInfo>> entrySet = | ||
| pendingReconstructions.entrySet(); | ||
| out.println("Metasave: Blocks being reconstructed: " + entrySet.size()); | ||
| for (Map.Entry<BlockInfo, PendingBlockInfo> entry : entrySet) { | ||
| PendingBlockInfo pendingBlock = entry.getValue(); | ||
| Block block = entry.getKey(); | ||
| out.print(block); | ||
| out.print(" StartTime: " + DateTimeFormatter.ISO_INSTANT | ||
| .format(Instant.ofEpochMilli(pendingBlock.timeStamp))); | ||
| out.print(" NumReconstructInProgress: " + pendingBlock.getNumReplicas()); | ||
| out.println(); | ||
| } | ||
| } | ||
|
|
||
| List<DatanodeStorageInfo> getTargets(BlockInfo block) { | ||
| synchronized (pendingReconstructions) { | ||
| PendingBlockInfo found = pendingReconstructions.get(block); | ||
| if (found != null) { | ||
| return found.targets; | ||
| } | ||
| } | ||
| return null; | ||
| Collection<DatanodeStorageInfo> getTargets(BlockInfo block) { | ||
| PendingBlockInfo found = pendingReconstructions.get(block); | ||
| return (found == null) ? Collections.emptySet() : found.targets; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlikely but this can still be technically be inconsistent, right?