Skip to content
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.

Commit

Permalink
Workaround for DN heartbeats with provided storage
Browse files Browse the repository at this point in the history
  • Loading branch information
cdouglas committed Dec 17, 2015
1 parent 44a6082 commit 33c6fcb
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1880,15 +1880,18 @@ public boolean processReport(final DatanodeID nodeID,
}
}

if (storageInfo.getBlockReportCount() == 0) {
// The first block report can be processed a lot more efficiently than
// ordinary block reports. This shortens restart times.
LOG.info("Processing first storage report for " +
storageInfo.getStorageID() + " from datanode " +
nodeID.getDatanodeUuid());
processFirstBlockReport(storageInfo, newReport);
} else {
invalidatedBlocks = processReport(storageInfo, newReport);
// Block reports for provided storage are not maintained by DN heartbeats
if (StorageType.PROVIDED.equals(storageInfo.getStorageType())) {
if (storageInfo.getBlockReportCount() == 0) {
// The first block report can be processed a lot more efficiently than
// ordinary block reports. This shortens restart times.
LOG.info("Processing first storage report for " +
storageInfo.getStorageID() + " from datanode " +
nodeID.getDatanodeUuid());
processFirstBlockReport(storageInfo, newReport);
} else {
invalidatedBlocks = processReport(storageInfo, newReport);
}
}

storageInfo.receivedBlockReport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

/**
* This class extends the DatanodeInfo class with ephemeral information (eg
* health, capacity, what blocks are associated with the Datanode) that is
Expand Down Expand Up @@ -336,6 +334,12 @@ List<DatanodeStorageInfo> removeZombieStorages() {
while (iter.hasNext()) {
Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
DatanodeStorageInfo storageInfo = entry.getValue();
if (StorageType.PROVIDED.equals(storageInfo.getStorageType())) {
// to verify provided storage participated in this hb, requires
// check to pass DNDesc.
// e.g., storageInfo.verifyBlockReportId(this, curBlockReportId)
continue;
}
if (storageInfo.getLastBlockReportId() != curBlockReportId) {
LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}",
storageInfo.getStorageID(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,19 @@
package org.apache.hadoop.hdfs.server.blockmanagement;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
Expand All @@ -35,8 +22,6 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -91,7 +76,7 @@ DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s)
// poll service, initiate
p.start();
dn.injectStorage(storage);
return dns.getStorage(dn, s);
return dns.getProvidedStorage(dn, s);
}
LOG.warn("Reserved storage {} reported as non-provided from {}", s, dn);
}
Expand Down Expand Up @@ -200,8 +185,10 @@ static class ProvidedDescriptor extends DatanodeDescriptor {

// TODO: examine BM::getBlocksWithLocations (used by rebalancer?)

DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s) {
DatanodeStorageInfo getProvidedStorage(
DatanodeDescriptor dn, DatanodeStorage s) {
dns.put(dn.getDatanodeUuid(), dn);
// TODO: maintain separate RPC ident per dn
return storageMap.get(s.getStorageID());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testBlockLoad() throws Exception {
startCluster(NAMEPATH, 1);
}

static Path removePrefix(Path base, Path walk) throws Exception {
static Path removePrefix(Path base, Path walk) {
Path wpath = new Path(walk.toUri().getPath());
Path bpath = new Path(base.toUri().getPath());
Path ret = new Path("/");
Expand All @@ -143,7 +143,7 @@ static Path removePrefix(Path base, Path walk) throws Exception {
wpath = wpath.getParent();
}
if (!bpath.equals(wpath)) {
throw new Exception(base + " not a prefix of " + walk);
throw new IllegalArgumentException(base + " not a prefix of " + walk);
}
return ret;
}
Expand Down

0 comments on commit 33c6fcb

Please sign in to comment.