Skip to content

Commit

Permalink
MLEC repair queueing
Browse files Browse the repository at this point in the history
  • Loading branch information
jiajunmao committed Aug 10, 2024
1 parent 80731a6 commit 7295cde
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public final boolean isCompleteOrCommitted() {
public void convertToBlockUnderConstruction(BlockUCState s,
DatanodeStorageInfo[] targets) {
LOG.info("convertToBlockUnderConstruction to state {} for block {}", s, this.getBlockId());
Thread.dumpStack();
// Thread.dumpStack();
if (isComplete()) {
uc = new BlockUnderConstructionFeature(this, s, targets,
this.getBlockType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ public int getPendingSPSPaths() {
private long excessRedundancyTimeoutCheckLimit;

// MLEC Stuff
private ZfsBlockManagement zfsBlockMgr;
public ZfsBlockManagement zfsBlockMgr;

public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
Expand Down Expand Up @@ -2173,10 +2173,12 @@ int computeReconstructionWorkForBlocks(
LOG.info("Zfs failure causes {}",failCause);

try {
LOG.info("Block {} failed because of {} on hosts {}",
rw.getBlock().getBlockId(),
failCause.stream().map(tuple -> tuple.getDatanodeStorageInfo().getStorageID()).collect(Collectors.toList()),
failCause.stream().map(tuple -> tuple.getDatanodeStorageInfo().getDatanodeDescriptor().getHostName()).collect(Collectors.toList()));
LOG.info("Block {} failed because of {} on hosts {}:{}",
rw.getBlock().getBlockId(),
failCause.stream().map(tuple -> tuple.getDatanodeStorageInfo().getStorageID()).collect(Collectors.toList()),
failCause.stream().map(tuple -> tuple.getDatanodeStorageInfo().getDatanodeDescriptor().getHostName()).collect(Collectors.toList()),
failCause.stream().map(tuple -> tuple.getDatanodeStorageInfo().getDatanodeDescriptor().getInfoPort()).collect(Collectors.toList()));

} catch (RuntimeException e) {
// Silence it
}
Expand All @@ -2192,9 +2194,12 @@ int computeReconstructionWorkForBlocks(
// The block is ZFS, we would need to restore it back to the same datanode that it is coming from
// However, we cannot directly write, because that IO pool would be suspended, the write will fail
// We need to instruct the writer to call a different API
LOG.info("The reconstruction block is ZFS, writing back to origin datanode {}",
LOG.info("The reconstruction block is ZFS, writing back to origin datanode {}:{}",
failCause.stream()
.map(failTuple -> failTuple.getDatanodeStorageInfo().getDatanodeDescriptor().getHostName())
.collect(Collectors.toList()),
failCause.stream()
.map(failTuple -> failTuple.getDatanodeStorageInfo().getDatanodeDescriptor().getInfoPort())
.collect(Collectors.toList()));
DatanodeStorageInfo[] failCauseArr =
new DatanodeStorageInfo[failCause.size()];
Expand Down Expand Up @@ -2236,7 +2241,8 @@ int computeReconstructionWorkForBlocks(
}

LOG.info("Adding reconstruction task to DN {}", Arrays.stream(rw.getTargets())
.map(i -> i.getDatanodeDescriptor().getHostName()).collect(Collectors.toList()));
.map(i -> i.getDatanodeDescriptor().getHostName() + ":" + i.getDatanodeDescriptor().getInfoPort())
.collect(Collectors.toList()));
synchronized (neededReconstruction) {
if (validateReconstructionWork(rw)) {
scheduledWork++;
Expand Down Expand Up @@ -2280,10 +2286,12 @@ boolean hasEnoughEffectiveReplicas(BlockInfo block,
@VisibleForTesting
BlockReconstructionWork scheduleReconstruction(BlockInfo block,
int priority) {
LOG.info("ScheduleReconstruction called on block {}", block);
// skip abandoned block or block reopened for append
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
// remove from neededReconstruction
neededReconstruction.remove(block, priority);
LOG.info("Block is already deleted or not completed");
return null;
}

Expand All @@ -2301,7 +2309,7 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
numReplicas);
if (srcNodes == null || srcNodes.length == 0) {
// block can not be reconstructed from any node
LOG.debug("Block {} cannot be reconstructed from any node", block);
LOG.info("Block {} cannot be reconstructed from any node", block);
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
return null;
}
Expand All @@ -2310,7 +2318,7 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
if (block.isStriped()) {
BlockInfoStriped stripedBlock = (BlockInfoStriped) block;
if (stripedBlock.getRealDataBlockNum() > srcNodes.length) {
LOG.debug("Block {} cannot be reconstructed due to shortage of source datanodes ", block);
LOG.info("Block {} cannot be reconstructed due to shortage of source datanodes ", block);
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
return null;
}
Expand All @@ -2321,9 +2329,13 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();

int pendingNum = pendingReconstruction.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {

// MLEC check: in place update
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)
&& !this.zfsBlockMgr.isZfsFailure(block)) {
neededReconstruction.remove(block, priority);
blockLog.debug("BLOCK* Removing {} from neededReconstruction as it has enough replicas",
blockLog.info("zfs failure {} {}", this.zfsBlockMgr.blockFailureSources, this.zfsBlockMgr.isZfsFailure(block));
blockLog.info("BLOCK* Removing {} from neededReconstruction as it has enough replicas",
block);
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
return null;
Expand All @@ -2343,6 +2355,7 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
if (block.isStriped()) {
if (pendingNum > 0) {
// Wait the previous reconstruction to finish.
LOG.info("Block has more than 0 pending reconstruction, waiting on them to finish first");
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
return null;
}
Expand Down Expand Up @@ -2435,7 +2448,11 @@ boolean validateReconstructionWork(BlockReconstructionWork rw) {
final short requiredRedundancy =
getExpectedLiveRedundancyNum(block, numReplicas);
final int pendingNum = pendingReconstruction.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {

// MLEC check, if its ZFS failure, we skip replica check, because we do not remove the block in the first place
LOG.info("Zfs failure {}", this.zfsBlockMgr.blockFailureSources);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)
&& !this.zfsBlockMgr.isZfsFailure(block)) {
neededReconstruction.remove(block, priority);
rw.resetTargets();
blockLog.debug("BLOCK* Removing {} from neededReconstruction as it has enough replicas",
Expand Down Expand Up @@ -4778,7 +4795,7 @@ private void processIncrementalBlockReport(final DatanodeDescriptor node,
}
}

private List<Block> getBlocksPeerOf(long blockId) {
public List<Block> getBlocksPeerOf(long blockId) {
Block block = new Block(blockId);

// Get the file path
Expand All @@ -4797,7 +4814,7 @@ private List<Block> getBlocksPeerOf(long blockId) {
LOG.info("Block peer {}-{} on datanode {}, status {}, type {}",
replicaStorage.getStorageType(),
replicaBlock.getBlockId(),
replicaStorage.getDatanodeDescriptor().getHostName(),
replicaStorage.getDatanodeDescriptor().getName(),
replicaStorage.getState(),
replicaStorage.getStorageType());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* Manage datanodes, include decommission and other activities.
Expand Down Expand Up @@ -1837,23 +1838,44 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
// 1. We get the BlockCollection from the ZFS failure report first
if (!zfsFailureReport.getFailedHdfsBlocks().isEmpty()) {
LOG.info("heartbeat contains {} zfs failed hdfs blocks", zfsFailureReport.getFailedHdfsBlocks().size());
LOG.info("heartbeat contains the storage report for {}", Arrays.stream(reports).map(r -> r.getStorage()).collect(Collectors.toList()));
LOG.info("=======");
}

zfsFailureReport.getFailedHdfsBlocks().forEach(zfsFailTuple -> {
LOG.info("Zfs node {} reported block {} failed", nodeinfo.getName(), zfsFailTuple.getFailedBlock());

nodeinfo.getBlockIterator().forEachRemaining(blockInfo -> {
LOG.info("Dn {} has block {}", nodeinfo.getName(), blockInfo.getBlockId());
});

final BlockInfo block = blockManager.getStoredBlock(new Block(zfsFailTuple.getFailedBlock()));

// Get the block peers of the failed block
List<Block> blockEcPeers = this.blockManager.getBlocksPeerOf(zfsFailTuple.getFailedBlock());

// Keep track of the failure cause
List<ZfsFailureTuple> alreadyFailed =
this.blockManager.zfsBlockMgr.blockFailureSources.getOrDefault(zfsFailTuple.getFailedBlock(), new ArrayList<>());
zfsFailTuple.setDatanodeStorageInfo(new DatanodeStorageInfo(nodeinfo, reports[0].getStorage()));
alreadyFailed.add(zfsFailTuple);
this.blockManager.zfsBlockMgr.blockFailureSources.put(zfsFailTuple.getFailedBlock(), alreadyFailed);

// 2. Check for the block redundancy
short expected = blockManager.getExpectedRedundancyNum(block);

final NumberReplicas n = blockManager.countNodes(block);
final int pending = blockManager.pendingReconstruction.getNumReplicas(block);

// This means that there is failure, we need to remove from the live map
final boolean hasEnoughReplica = blockManager.hasEnoughEffectiveReplicas(block, n, pending);
LOG.info("Expected {}, num replica {}, pending {}, enough replica {}",
expected, n, pending, hasEnoughReplica);
if (!hasEnoughReplica) {
blockManager.scheduleReconstruction(block, 0);
}
// if (!hasEnoughReplica) {

// We schedule the reconstruction no matter what at the moment, currently we always just assume that more than m columns failed in zfs
// blockManager.scheduleReconstruction(block, 0);
blockManager.neededReconstruction.add(block, expected - 1, 0, 1, expected);
// }
});

heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,6 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
continue;
}

// 1. We get the failed block from the file name
String[] volumePath = volumeInfo.getPath().split("/");
String[] blockFilePath = dnode.path.split("/");

// 2. Check whether this dnode is a hdfs block (rather than directory, metadata, etc)
Optional<Matcher> matcher = ZfsFailureTuple.isHdfsBlock(dnode);
if (!matcher.isPresent()) {
Expand All @@ -585,34 +581,38 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)

// This means that this is a block file, not directory, not anything else
// Get the block from the file name
// TODO: figure out why we need to -1 here
long hdfsBlockId = Long.parseLong(matcher.get().group(1)) - 1;
long hdfsBlockId = Long.parseLong(matcher.get().group(1));

// If this is already a known issue, we ignore
if (this.mlecDnMgmt.knownFailures.containsKey(hdfsBlockId)) {
continue;
}

LOG.info("Failed hdfs block {} corresponding to zfs dn {}", hdfsBlockId, dnode);
LOG.info("Failed hdfs block on {}:{} {} corresponding to zfs dn {}", dn.getDatanodeHostname(), dn.getHttpPort(),
hdfsBlockId, dnode);

// MLEC: testing purpose
dnode.childStatus.set(0, 1);

ZfsFailureTuple failureTuple = new ZfsFailureTuple(hdfsBlockId, dnode.childStatus);
zfsReport.getFailedHdfsBlocks().add(failureTuple);

this.mlecDnMgmt.knownFailures.put(hdfsBlockId, failureTuple);
}


HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getActiveTransferThreadCount(),
numFailedVolumes,
volumeFailureSummary,
requestBlockReportLease,
slowPeers,
slowDisks,
zfsReport);
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getActiveTransferThreadCount(),
numFailedVolumes,
volumeFailureSummary,
requestBlockReportLease,
slowPeers,
slowDisks,
zfsReport);

scheduler.updateLastHeartbeatResponseTime(monotonicNow());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2738,7 +2738,7 @@ private void handleDiskError(String failedVolumes, int failedNumber) {

//inform NameNodes
for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
LOG.info("Try sending error report");
LOG.info("Try sending disk error report");
bpos.trySendErrorReport(dpError, failedVolumes);
}

Expand Down

0 comments on commit 7295cde

Please sign in to comment.