Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Whether to enable datanode's stale state detection and usage for reads
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
public static final String DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY =
"dfs.namenode.avoid.read.slow.datanode";
public static final boolean
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT = false;
// Whether to enable datanode's stale state detection and usage for writes
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,23 +156,36 @@ public int compare(DatanodeInfo a, DatanodeInfo b) {

/**
* Comparator for sorting DataNodeInfo[] based on
* stale, decommissioned and entering_maintenance states.
* Order: live {@literal ->} stale {@literal ->} entering_maintenance
* {@literal ->} decommissioned
* slow, stale, entering_maintenance and decommissioned states.
* Order: live {@literal ->} slow {@literal ->} stale {@literal ->}
* entering_maintenance {@literal ->} decommissioned
*/
@InterfaceAudience.Private
public static class ServiceAndStaleComparator extends ServiceComparator {
public static class StaleAndSlowComparator extends ServiceComparator {
private final boolean avoidStaleDataNodesForRead;
private final long staleInterval;
private final boolean avoidSlowDataNodesForRead;
private final Set<String> slowNodesUuidSet;

/**
* Constructor of ServiceAndStaleComparator
*
* @param avoidStaleDataNodesForRead
* Whether or not to avoid using stale DataNodes for reading.
* @param interval
* The time interval for marking datanodes as stale is passed from
* outside, since the interval may be changed dynamically
* outside, since the interval may be changed dynamically.
* @param avoidSlowDataNodesForRead
* Whether or not to avoid using slow DataNodes for reading.
* @param slowNodesUuidSet
* Slow DataNodes UUID set.
*/
public ServiceAndStaleComparator(long interval) {
public StaleAndSlowComparator(
boolean avoidStaleDataNodesForRead, long interval,
boolean avoidSlowDataNodesForRead, Set<String> slowNodesUuidSet) {
this.avoidStaleDataNodesForRead = avoidStaleDataNodesForRead;
this.staleInterval = interval;
this.avoidSlowDataNodesForRead = avoidSlowDataNodesForRead;
this.slowNodesUuidSet = slowNodesUuidSet;
}

@Override
Expand All @@ -183,9 +196,22 @@ public int compare(DatanodeInfo a, DatanodeInfo b) {
}

// Stale nodes will be moved behind the normal nodes
boolean aStale = a.isStale(staleInterval);
boolean bStale = b.isStale(staleInterval);
return aStale == bStale ? 0 : (aStale ? 1 : -1);
if (avoidStaleDataNodesForRead) {
boolean aStale = a.isStale(staleInterval);
boolean bStale = b.isStale(staleInterval);
ret = aStale == bStale ? 0 : (aStale ? 1 : -1);
if (ret != 0) {
return ret;
}
}

// Slow nodes will be moved behind the normal nodes
if (avoidSlowDataNodesForRead) {
boolean aSlow = slowNodesUuidSet.contains(a.getDatanodeUuid());
boolean bSlow = slowNodesUuidSet.contains(b.getDatanodeUuid());
ret = aSlow == bSlow ? 0 : (aSlow ? 1 : -1);
}
return ret;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1104,8 +1104,8 @@ boolean isGoodDatanode(DatanodeDescriptor node,

// check if the target is a slow node
if (dataNodePeerStatsEnabled && excludeSlowNodesEnabled) {
Set<Node> nodes = DatanodeManager.getSlowNodes();
if (nodes.contains(node)) {
Set<String> slowNodesUuidSet = DatanodeManager.getSlowNodesUuidSet();
if (slowNodesUuidSet.contains(node.getDatanodeUuid())) {
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_SLOW);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ public class DatanodeManager {
/** Whether or not to avoid using stale DataNodes for reading */
private final boolean avoidStaleDataNodesForRead;

/** Whether or not to avoid using slow DataNodes for reading. */
private final boolean avoidSlowDataNodesForRead;

/** Whether or not to consider lad for reading. */
private final boolean readConsiderLoad;

Expand Down Expand Up @@ -210,7 +213,7 @@ public class DatanodeManager {

@Nullable
private final SlowPeerTracker slowPeerTracker;
private static Set<Node> slowNodesSet = Sets.newConcurrentHashSet();
private static Set<String> slowNodesUuidSet = Sets.newConcurrentHashSet();
private Daemon slowPeerCollectorDaemon;
private final long slowPeerCollectionInterval;
private final int maxSlowPeerReportNodes;
Expand Down Expand Up @@ -242,7 +245,6 @@ public class DatanodeManager {
} else {
networktopology = NetworkTopology.getInstance(conf);
}

this.heartbeatManager = new HeartbeatManager(namesystem,
blockManager, conf);
this.datanodeAdminManager = new DatanodeAdminManager(namesystem,
Expand Down Expand Up @@ -273,7 +275,6 @@ public class DatanodeManager {
}
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
new SlowDiskTracker(conf, timer) : null;

this.defaultXferPort = NetUtils.createSocketAddr(
conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
Expand All @@ -294,11 +295,9 @@ public class DatanodeManager {
} catch (IOException e) {
LOG.error("error reading hosts files: ", e);
}

this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);

this.rejectUnresolvedTopologyDN = conf.getBoolean(
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY,
DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT);
Expand All @@ -313,7 +312,6 @@ public class DatanodeManager {
}
dnsToSwitchMapping.resolve(locations);
}

heartbeatIntervalSeconds = conf.getTimeDuration(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
Expand All @@ -322,7 +320,6 @@ public class DatanodeManager {
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
+ 10 * 1000 * heartbeatIntervalSeconds;

// Effected block invalidate limit is the bigger value between
// value configured in hdfs-site.xml, and 20 * HB interval.
final int configuredBlockInvalidateLimit = conf.getInt(
Expand All @@ -335,16 +332,17 @@ public class DatanodeManager {
+ ": configured=" + configuredBlockInvalidateLimit
+ ", counted=" + countedBlockInvalidateLimit
+ ", effected=" + blockInvalidateLimit);

this.checkIpHostnameInRegistration = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT);
LOG.info(DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY
+ "=" + checkIpHostnameInRegistration);

this.avoidStaleDataNodesForRead = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
this.avoidSlowDataNodesForRead = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT);
this.readConsiderLoad = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT);
Expand Down Expand Up @@ -389,7 +387,7 @@ private void startSlowPeerCollector() {
public void run() {
while (true) {
try {
slowNodesSet = getSlowPeers();
slowNodesUuidSet = getSlowPeersUuidSet();
} catch (Exception e) {
LOG.error("Failed to collect slow peers", e);
}
Expand Down Expand Up @@ -509,12 +507,16 @@ private boolean isInactive(DatanodeInfo datanode) {
return datanode.isDecommissioned() || datanode.isEnteringMaintenance() ||
(avoidStaleDataNodesForRead && datanode.isStale(staleInterval));
}

private boolean isSlowNode(String dnUuid) {
return avoidSlowDataNodesForRead && slowNodesUuidSet.contains(dnUuid);
}

/**
* Sort the non-striped located blocks by the distance to the target host.
*
* For striped blocks, it will only move decommissioned/stale nodes to the
* bottom. For example, assume we have storage list:
* For striped blocks, it will only move decommissioned/stale/slow
* nodes to the bottom. For example, assume we have storage list:
* d0, d1, d2, d3, d4, d5, d6, d7, d8, d9
* mapping to block indices:
* 0, 1, 2, 3, 4, 5, 6, 7, 8, 2
Expand All @@ -526,8 +528,11 @@ private boolean isInactive(DatanodeInfo datanode) {
*/
public void sortLocatedBlocks(final String targetHost,
final List<LocatedBlock> locatedBlocks) {
Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
new DFSUtil.ServiceAndStaleComparator(staleInterval) :
Comparator<DatanodeInfo> comparator =
avoidStaleDataNodesForRead || avoidSlowDataNodesForRead ?
new DFSUtil.StaleAndSlowComparator(
avoidStaleDataNodesForRead, staleInterval,
avoidSlowDataNodesForRead, slowNodesUuidSet) :
new DFSUtil.ServiceComparator();
// sort located block
for (LocatedBlock lb : locatedBlocks) {
Expand All @@ -540,7 +545,8 @@ public void sortLocatedBlocks(final String targetHost,
}

/**
* Move decommissioned/stale datanodes to the bottom. After sorting it will
* Move decommissioned/entering_maintenance/stale/slow
* datanodes to the bottom. After sorting it will
* update block indices and block tokens respectively.
*
* @param lb located striped block
Expand Down Expand Up @@ -571,8 +577,9 @@ private void sortLocatedStripedBlock(final LocatedBlock lb,
}

/**
* Move decommissioned/entering_maintenance/stale datanodes to the bottom.
* Also, sort nodes by network distance.
* Move decommissioned/entering_maintenance/stale/slow
* datanodes to the bottom. Also, sort nodes by network
* distance.
*
* @param lb located block
* @param targetHost target host
Expand Down Expand Up @@ -602,12 +609,15 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
}

DatanodeInfoWithStorage[] di = lb.getLocations();
// Move decommissioned/entering_maintenance/stale datanodes to the bottom
// Move decommissioned/entering_maintenance/stale/slow
// datanodes to the bottom
Arrays.sort(di, comparator);

// Sort nodes by network distance only for located blocks
int lastActiveIndex = di.length - 1;
while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) {
while (lastActiveIndex > 0 && (
isSlowNode(di[lastActiveIndex].getDatanodeUuid()) ||
isInactive(di[lastActiveIndex]))) {
--lastActiveIndex;
}
int activeLen = lastActiveIndex + 1;
Expand Down Expand Up @@ -2085,10 +2095,10 @@ public String getSlowPeersReport() {
* Returns all tracking slow peers.
* @return
*/
public Set<Node> getSlowPeers() {
Set<Node> slowPeersSet = Sets.newConcurrentHashSet();
public Set<String> getSlowPeersUuidSet() {
Set<String> slowPeersUuidSet = Sets.newConcurrentHashSet();
if (slowPeerTracker == null) {
return slowPeersSet;
return slowPeersUuidSet;
}
ArrayList<String> slowNodes =
slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
Expand All @@ -2101,18 +2111,18 @@ public Set<Node> getSlowPeers() {
DatanodeDescriptor datanodeByHost =
host2DatanodeMap.getDatanodeByHost(ipAddr);
if (datanodeByHost != null) {
slowPeersSet.add(datanodeByHost);
slowPeersUuidSet.add(datanodeByHost.getDatanodeUuid());
}
}
return slowPeersSet;
return slowPeersUuidSet;
}

/**
* Returns all tracking slow peers.
* Returns all tracking slow datanodes uuids.
* @return
*/
public static Set<Node> getSlowNodes() {
return slowNodesSet;
public static Set<String> getSlowNodesUuidSet() {
return slowNodesUuidSet;
}

/**
Expand All @@ -2130,6 +2140,12 @@ public SlowPeerTracker getSlowPeerTracker() {
public SlowDiskTracker getSlowDiskTracker() {
return slowDiskTracker;
}

@VisibleForTesting
public void addSlowPeers(String dnUuid) {
slowNodesUuidSet.add(dnUuid);
}

/**
* Retrieve information about slow disks as a JSON.
* Returns null if we are not tracking slow disks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2110,6 +2110,16 @@
</description>
</property>

<property>
<name>dfs.namenode.avoid.read.slow.datanode</name>
<value>false</value>
<description>
Indicate whether or not to avoid reading from &quot;slow&quot; datanodes.
Slow datanodes will be moved to the end of the node list returned
for reading.
</description>
</property>

<property>
<name>dfs.namenode.avoid.write.stale.datanode</name>
<value>false</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.Node;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -100,12 +99,12 @@ public void testChooseTargetExcludeSlowNodes() throws Exception {
Thread.sleep(3000);

// fetch slow nodes
Set<Node> slowPeers = dnManager.getSlowPeers();
Set<String> slowPeers = dnManager.getSlowPeersUuidSet();

// assert slow nodes
assertEquals(3, slowPeers.size());
for (int i = 0; i < slowPeers.size(); i++) {
assertTrue(slowPeers.contains(dataNodes[i]));
assertTrue(slowPeers.contains(dataNodes[i].getDatanodeUuid()));
}

// mock writer
Expand All @@ -120,7 +119,8 @@ public void testChooseTargetExcludeSlowNodes() throws Exception {
// assert targets
assertEquals(3, targets.length);
for (int i = 0; i < targets.length; i++) {
assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor()));
assertTrue(!slowPeers.contains(targets[i].getDatanodeDescriptor()
.getDatanodeUuid()));
}
} finally {
namenode.getNamesystem().writeUnlock();
Expand Down
Loading