From 7295cde2d2c1d4b1bb7e4a259c512855be55ab1e Mon Sep 17 00:00:00 2001 From: Jiajun Mao Date: Sat, 10 Aug 2024 21:35:30 +0000 Subject: [PATCH] MLEC repair queueing --- .../server/blockmanagement/BlockInfo.java | 2 +- .../server/blockmanagement/BlockManager.java | 45 +++++++++++++------ .../blockmanagement/DatanodeManager.java | 30 +++++++++++-- .../hdfs/server/datanode/BPServiceActor.java | 36 +++++++-------- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- 5 files changed, 77 insertions(+), 38 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index 4c04413471d0c..d9ad31abce438 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -403,7 +403,7 @@ public final boolean isCompleteOrCommitted() { public void convertToBlockUnderConstruction(BlockUCState s, DatanodeStorageInfo[] targets) { LOG.info("convertToBlockUnderConstruction to state {} for block {}", s, this.getBlockId()); - Thread.dumpStack(); +// Thread.dumpStack(); if (isComplete()) { uc = new BlockUnderConstructionFeature(this, s, targets, this.getBlockType()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index f837296ba62b3..010f64cd1bd69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -498,7 +498,7 @@ public int getPendingSPSPaths() { private long excessRedundancyTimeoutCheckLimit; // MLEC Stuff - private ZfsBlockManagement zfsBlockMgr; + public ZfsBlockManagement zfsBlockMgr; public BlockManager(final Namesystem namesystem, boolean haEnabled, final Configuration conf) throws IOException { @@ -2173,10 +2173,12 @@ int computeReconstructionWorkForBlocks( LOG.info("Zfs failure causes {}",failCause); try { - LOG.info("Block {} failed because of {} on hosts {}", - rw.getBlock().getBlockId(), - failCause.stream().map(tuple -> tuple.getDatanodeStorageInfo().getStorageID()).collect(Collectors.toList()), - failCause.stream().map(tuple -> tuple.getDatanodeStorageInfo().getDatanodeDescriptor().getHostName()).collect(Collectors.toList())); + LOG.info("Block {} failed because of {} on hosts {}:{}", + rw.getBlock().getBlockId(), + failCause.stream().map(tuple -> tuple.getDatanodeStorageInfo().getStorageID()).collect(Collectors.toList()), + failCause.stream().map(tuple -> tuple.getDatanodeStorageInfo().getDatanodeDescriptor().getHostName()).collect(Collectors.toList()), + failCause.stream().map(tuple -> tuple.getDatanodeStorageInfo().getDatanodeDescriptor().getInfoPort()).collect(Collectors.toList())); + } catch (RuntimeException e) { // Silence it } @@ -2192,9 +2194,12 @@ int computeReconstructionWorkForBlocks( // The block is ZFS, we would need to restore it back to the same datanode that it is coming from // However, we cannot directly write, because that IO pool would be suspended, the write will fail // We need to instruct the writer to call a different API - LOG.info("The reconstruction block is ZFS, writing back to origin datanode {}", + LOG.info("The reconstruction block is ZFS, writing back to origin datanode {}:{}", failCause.stream() .map(failTuple -> failTuple.getDatanodeStorageInfo().getDatanodeDescriptor().getHostName()) + .collect(Collectors.toList()), + failCause.stream() + .map(failTuple -> failTuple.getDatanodeStorageInfo().getDatanodeDescriptor().getInfoPort()) .collect(Collectors.toList())); DatanodeStorageInfo[] failCauseArr = new DatanodeStorageInfo[failCause.size()]; @@ -2236,7 +2241,8 @@ int computeReconstructionWorkForBlocks( } LOG.info("Adding reconstruction task to DN {}", Arrays.stream(rw.getTargets()) - .map(i -> i.getDatanodeDescriptor().getHostName()).collect(Collectors.toList())); + .map(i -> i.getDatanodeDescriptor().getHostName() + ":" + i.getDatanodeDescriptor().getInfoPort()) + .collect(Collectors.toList())); synchronized (neededReconstruction) { if (validateReconstructionWork(rw)) { scheduledWork++; @@ -2280,10 +2286,12 @@ boolean hasEnoughEffectiveReplicas(BlockInfo block, @VisibleForTesting BlockReconstructionWork scheduleReconstruction(BlockInfo block, int priority) { + LOG.info("ScheduleReconstruction called on block {}", block); // skip abandoned block or block reopened for append if (block.isDeleted() || !block.isCompleteOrCommitted()) { // remove from neededReconstruction neededReconstruction.remove(block, priority); + LOG.info("Block is already deleted or not completed"); return null; } @@ -2301,7 +2309,7 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block, numReplicas); if (srcNodes == null || srcNodes.length == 0) { // block can not be reconstructed from any node - LOG.debug("Block {} cannot be reconstructed from any node", block); + LOG.info("Block {} cannot be reconstructed from any node", block); NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled(); return null; } @@ -2310,7 +2318,7 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block, if (block.isStriped()) { BlockInfoStriped stripedBlock = (BlockInfoStriped) block; if (stripedBlock.getRealDataBlockNum() > srcNodes.length) { - LOG.debug("Block {} cannot be reconstructed due to shortage of source datanodes ", block); + LOG.info("Block {} cannot be reconstructed due to shortage of source datanodes ", block); NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled(); return null; } @@ -2321,9 +2329,13 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block, assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); int pendingNum = pendingReconstruction.getNumReplicas(block); - if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) { + + // MLEC check: in place update + if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum) + && !this.zfsBlockMgr.isZfsFailure(block)) { neededReconstruction.remove(block, priority); - blockLog.debug("BLOCK* Removing {} from neededReconstruction as it has enough replicas", + blockLog.info("zfs failure {} {}", this.zfsBlockMgr.blockFailureSources, this.zfsBlockMgr.isZfsFailure(block)); + blockLog.info("BLOCK* Removing {} from neededReconstruction as it has enough replicas", block); NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled(); return null; @@ -2343,6 +2355,7 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block, if (block.isStriped()) { if (pendingNum > 0) { // Wait the previous reconstruction to finish. + LOG.info("Block has more than 0 pending reconstruction, waiting on them to finish first"); NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled(); return null; } @@ -2435,7 +2448,11 @@ boolean validateReconstructionWork(BlockReconstructionWork rw) { final short requiredRedundancy = getExpectedLiveRedundancyNum(block, numReplicas); final int pendingNum = pendingReconstruction.getNumReplicas(block); - if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) { + + // MLEC check, if its ZFS failure, we skip replica check, because we do not remove the block in the first place + LOG.info("Zfs failure {}", this.zfsBlockMgr.blockFailureSources); + if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum) + && !this.zfsBlockMgr.isZfsFailure(block)) { neededReconstruction.remove(block, priority); rw.resetTargets(); blockLog.debug("BLOCK* Removing {} from neededReconstruction as it has enough replicas", @@ -4778,7 +4795,7 @@ private void processIncrementalBlockReport(final DatanodeDescriptor node, } } - private List getBlocksPeerOf(long blockId) { + public List getBlocksPeerOf(long blockId) { Block block = new Block(blockId); // Get the file path @@ -4797,7 +4814,7 @@ private List getBlocksPeerOf(long blockId) { LOG.info("Block peer {}-{} on datanode {}, status {}, type {}", replicaStorage.getStorageType(), replicaBlock.getBlockId(), - replicaStorage.getDatanodeDescriptor().getHostName(), + replicaStorage.getDatanodeDescriptor().getName(), replicaStorage.getState(), replicaStorage.getStorageType()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index eaffaf05ea97c..7ba30e9c11e7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -72,6 +72,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Collectors; /** * Manage datanodes, include decommission and other activities. @@ -1837,23 +1838,44 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, // 1. We get the BlockCollection from the ZFS failure report first if (!zfsFailureReport.getFailedHdfsBlocks().isEmpty()) { LOG.info("heartbeat contains {} zfs failed hdfs blocks", zfsFailureReport.getFailedHdfsBlocks().size()); + LOG.info("heartbeat contains the storage report for {}", Arrays.stream(reports).map(r -> r.getStorage()).collect(Collectors.toList())); LOG.info("======="); } zfsFailureReport.getFailedHdfsBlocks().forEach(zfsFailTuple -> { + LOG.info("Zfs node {} reported block {} failed", nodeinfo.getName(), zfsFailTuple.getFailedBlock()); + + nodeinfo.getBlockIterator().forEachRemaining(blockInfo -> { + LOG.info("Dn {} has block {}", nodeinfo.getName(), blockInfo.getBlockId()); + }); + final BlockInfo block = blockManager.getStoredBlock(new Block(zfsFailTuple.getFailedBlock())); + // Get the block peers of the failed block + List blockEcPeers = this.blockManager.getBlocksPeerOf(zfsFailTuple.getFailedBlock()); + + // Keep track of the failure cause + List alreadyFailed = + this.blockManager.zfsBlockMgr.blockFailureSources.getOrDefault(zfsFailTuple.getFailedBlock(), new ArrayList<>()); + zfsFailTuple.setDatanodeStorageInfo(new DatanodeStorageInfo(nodeinfo, reports[0].getStorage())); + alreadyFailed.add(zfsFailTuple); + this.blockManager.zfsBlockMgr.blockFailureSources.put(zfsFailTuple.getFailedBlock(), alreadyFailed); + // 2. Check for the block redundancy short expected = blockManager.getExpectedRedundancyNum(block); - final NumberReplicas n = blockManager.countNodes(block); final int pending = blockManager.pendingReconstruction.getNumReplicas(block); + + // This means that there is failure, we need to remove from the live map final boolean hasEnoughReplica = blockManager.hasEnoughEffectiveReplicas(block, n, pending); LOG.info("Expected {}, num replica {}, pending {}, enough replica {}", expected, n, pending, hasEnoughReplica); - if (!hasEnoughReplica) { - blockManager.scheduleReconstruction(block, 0); - } +// if (!hasEnoughReplica) { + + // We schedule the reconstruction no matter what at the moment, currently we always just assume that more than m columns failed in zfs +// blockManager.scheduleReconstruction(block, 0); + blockManager.neededReconstruction.add(block, expected - 1, 0, 1, expected); +// } }); heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index d467f0c9bfe2b..a58fbf519087e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -573,10 +573,6 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) continue; } - // 1. We get the failed block from the file name - String[] volumePath = volumeInfo.getPath().split("/"); - String[] blockFilePath = dnode.path.split("/"); - // 2. Check whether this dnode is a hdfs block (rather than directory, metadata, etc) Optional matcher = ZfsFailureTuple.isHdfsBlock(dnode); if (!matcher.isPresent()) { @@ -585,15 +581,18 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) // This means that this is a block file, not directory, not anything else // Get the block from the file name - // TODO: figure out why we need to -1 here - long hdfsBlockId = Long.parseLong(matcher.get().group(1)) - 1; + long hdfsBlockId = Long.parseLong(matcher.get().group(1)); // If this is already a known issue, we ignore if (this.mlecDnMgmt.knownFailures.containsKey(hdfsBlockId)) { continue; } - LOG.info("Failed hdfs block {} corresponding to zfs dn {}", hdfsBlockId, dnode); + LOG.info("Failed hdfs block on {}:{} {} corresponding to zfs dn {}", dn.getDatanodeHostname(), dn.getHttpPort(), + hdfsBlockId, dnode); + + // MLEC: testing purpose + dnode.childStatus.set(0, 1); ZfsFailureTuple failureTuple = new ZfsFailureTuple(hdfsBlockId, dnode.childStatus); zfsReport.getFailedHdfsBlocks().add(failureTuple); @@ -601,18 +600,19 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) this.mlecDnMgmt.knownFailures.put(hdfsBlockId, failureTuple); } + HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, - reports, - dn.getFSDataset().getCacheCapacity(), - dn.getFSDataset().getCacheUsed(), - dn.getXmitsInProgress(), - dn.getActiveTransferThreadCount(), - numFailedVolumes, - volumeFailureSummary, - requestBlockReportLease, - slowPeers, - slowDisks, - zfsReport); + reports, + dn.getFSDataset().getCacheCapacity(), + dn.getFSDataset().getCacheUsed(), + dn.getXmitsInProgress(), + dn.getActiveTransferThreadCount(), + numFailedVolumes, + volumeFailureSummary, + requestBlockReportLease, + slowPeers, + slowDisks, + zfsReport); scheduler.updateLastHeartbeatResponseTime(monotonicNow()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index d666daf3d5632..6284de85441ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2738,7 +2738,7 @@ private void handleDiskError(String failedVolumes, int failedNumber) { //inform NameNodes for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) { - LOG.info("Try sending error report"); + LOG.info("Try sending disk error report"); bpos.trySendErrorReport(dpError, failedVolumes); }