Skip to content

Commit

Permalink
DeadNodeDetector basic model
Browse files Browse the repository at this point in the history
Signed-off-by: sunlisheng <sunlisheng@xiaomi.com>
  • Loading branch information
sunlisheng committed Aug 31, 2019
1 parent 7d998cb commit 0711432
Show file tree
Hide file tree
Showing 10 changed files with 551 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ReflectionUtils;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -118,6 +119,19 @@ public class ClientContext {
private NodeBase clientNode;
private boolean topologyResolutionEnabled;

private Daemon deadNodeDetectorThr = null;

/**
* The switch to DeadNodeDetector.
*/
private boolean sharedDeadNodesEnabled = false;

/**
* Detect the dead datanodes n advance, and share this information among all
* the DFSInputStreams in the same client.
*/
private DeadNodeDetector deadNodeDetector = null;

private ClientContext(String name, DfsClientConf conf,
Configuration config) {
final ShortCircuitConf scConf = conf.getShortCircuitConf();
Expand All @@ -134,6 +148,12 @@ private ClientContext(String name, DfsClientConf conf,

this.byteArrayManager = ByteArrayManager.newInstance(
conf.getWriteByteArrayManagerConf());
this.sharedDeadNodesEnabled = conf.isSharedDeadNodesEnabled();
if (sharedDeadNodesEnabled && deadNodeDetector == null) {
deadNodeDetector = new DeadNodeDetector(name);
deadNodeDetectorThr = new Daemon(deadNodeDetector);
deadNodeDetectorThr.start();
}
initTopologyResolution(config);
}

Expand Down Expand Up @@ -251,4 +271,12 @@ public int getNetworkDistance(DatanodeInfo datanodeInfo) throws IOException {
datanodeInfo.getNetworkLocation());
return NetworkTopology.getDistanceByPath(clientNode, node);
}

public boolean isSharedDeadNodesEnabled() {
return sharedDeadNodesEnabled;
}

public DeadNodeDetector getDeadNodeDetector() {
return deadNodeDetector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -3226,4 +3228,95 @@ public HAServiceProtocol.HAServiceState getHAServiceState()
throws IOException {
return namenode.getHAServiceState();
}

/**
* If sharedDeadNodesEnabled is true, return the dead nodes are detected by
* all the DFSInputStreams in the same client. Otherwise return the dead nodes
* are detected by this DFSInputStream.
*/
public ConcurrentHashMap<DatanodeInfo, DatanodeInfo> getDeadNodes(
DFSInputStream dfsInputStream) {
if (clientContext.isSharedDeadNodesEnabled()) {
ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
if (dfsInputStream != null) {
deadNodes.putAll(dfsInputStream.getLocalDeadNodes());
}

Set<DatanodeInfo> detectDeadNodes =
clientContext.getDeadNodeDetector().getDeadNodesToDetect();
for (DatanodeInfo detectDeadNode : detectDeadNodes) {
deadNodes.put(detectDeadNode, detectDeadNode);
}
return deadNodes;
} else {
return dfsInputStream.getLocalDeadNodes();
}
}

/**
* If sharedDeadNodesEnabled is true, judgement based on whether this datanode
* is included or not in DeadNodeDetector#deadnodes. Otherwise judgment based
* on whether it is included or not in DFSInputStream#deadnodes.
*/
public boolean isDeadNode(DFSInputStream dfsInputStream,
DatanodeInfo datanodeInfo) {
if (isSharedDeadNodesEnabled()) {
boolean isDeadNode =
clientContext.getDeadNodeDetector().isDeadNode(datanodeInfo);
if (dfsInputStream != null) {
isDeadNode = isDeadNode
|| dfsInputStream.getLocalDeadNodes().contains(datanodeInfo);
}
return isDeadNode;
} else {
return dfsInputStream.getLocalDeadNodes().contains(datanodeInfo);
}
}

/**
* If sharedDeadNodesEnabled is true, add datanode in
* DeadNodeDetector#deadnodes and dfsInputStreamNodes.
*/
public void addNodeToDetect(DFSInputStream dfsInputStream,
DatanodeInfo datanodeInfo) {
if (!isSharedDeadNodesEnabled()) {
return;
}
clientContext.getDeadNodeDetector().addNodeToDetect(dfsInputStream,
datanodeInfo);
}

/**
* If sharedDeadNodesEnabled is true,remove datanode from
* DeadNodeDetector#dfsInputStreamNodes.
*/
public void removeNodeFromDetectByDFSInputStream(
DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) {
if (!isSharedDeadNodesEnabled()) {
return;
}
clientContext.getDeadNodeDetector()
.removeNodeFromDetectByDFSInputStream(dfsInputStream, datanodeInfo);
}

/**
* If sharedDeadNodesEnabled is true and locatedBlocks is not null,remove
* locatedBlocks#datanodeInfos from DeadNodeDetector#dfsInputStreamNodes.
*/
public void removeNodeFromDetectByDFSInputStream(
DFSInputStream dfsInputStream, LocatedBlocks locatedBlocks) {
if (!isSharedDeadNodesEnabled() || locatedBlocks == null) {
return;
}
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
for (DatanodeInfo datanodeInfo : locatedBlock.getLocations()) {
removeNodeFromDetectByDFSInputStream(dfsInputStream, datanodeInfo);
}
}
}

private boolean isSharedDeadNodesEnabled() {
return clientContext.isSharedDeadNodesEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,26 @@ public class DFSInputStream extends FSInputStream

private byte[] oneByteBuf; // used for 'int read()'

void addToDeadNodes(DatanodeInfo dnInfo) {
void addToLocalDeadNodes(DatanodeInfo dnInfo) {
deadNodes.put(dnInfo, dnInfo);
}

public void removeFromLocalDeadNodes(DatanodeInfo dnInfo) {
deadNodes.remove(dnInfo);
}

public ConcurrentHashMap<DatanodeInfo, DatanodeInfo> getLocalDeadNodes() {
return deadNodes;
}

public void clearLocalDeadNodes() {
deadNodes.clear();
}

public DFSClient getDfsClient() {
return dfsClient;
}

DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
LocatedBlocks locatedBlocks) throws IOException {
this.dfsClient = dfsClient;
Expand Down Expand Up @@ -599,7 +615,8 @@ private synchronized DatanodeInfo blockSeekTo(long target)
+ "{}, add to deadNodes and continue. ", targetAddr, src,
targetBlock.getBlock(), ex);
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
addToLocalDeadNodes(chosenNode);
dfsClient.addNodeToDetect(this, chosenNode);
}
}
}
Expand Down Expand Up @@ -650,28 +667,40 @@ protected BlockReader getBlockReader(LocatedBlock targetBlock,
*/
@Override
public synchronized void close() throws IOException {
if (!closed.compareAndSet(false, true)) {
DFSClient.LOG.debug("DFSInputStream has been closed already");
return;
}
dfsClient.checkOpen();

if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) {
final StringBuilder builder = new StringBuilder();
extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
private String prefix = "";
@Override
public void accept(ByteBuffer k, Object v) {
builder.append(prefix).append(k);
prefix = ", ";
}
});
DFSClient.LOG.warn("closing file " + src + ", but there are still " +
"unreleased ByteBuffers allocated by read(). " +
"Please release " + builder.toString() + ".");
try {
if (!closed.compareAndSet(false, true)) {
DFSClient.LOG.debug("DFSInputStream has been closed already");
return;
}
dfsClient.checkOpen();

if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) {
final StringBuilder builder = new StringBuilder();
extendedReadBuffers
.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
private String prefix = "";

@Override
public void accept(ByteBuffer k, Object v) {
builder.append(prefix).append(k);
prefix = ", ";
}
});
DFSClient.LOG.warn("closing file " + src + ", but there are still "
+ "unreleased ByteBuffers allocated by read(). "
+ "Please release " + builder.toString() + ".");
}
closeCurrentBlockReaders();
super.close();
} finally {
/**
* If dfsInputStream is closed and datanode is in
* DeadNodeDetector#dfsInputStreamNodes, we need remove the datanode from
* the DeadNodeDetector#dfsInputStreamNodes. Since user should not use
* this dfsInputStream anymore.
*/
dfsClient.removeNodeFromDetectByDFSInputStream(this, locatedBlocks);
}
closeCurrentBlockReaders();
super.close();
}

@Override
Expand Down Expand Up @@ -728,7 +757,8 @@ private synchronized int readBuffer(ReaderStrategy reader, int len,
*/
sourceFound = seekToBlockSource(pos);
} else {
addToDeadNodes(currentNode);
addToLocalDeadNodes(currentNode);
dfsClient.addNodeToDetect(this, currentNode);
sourceFound = seekToNewSource(pos);
}
if (!sourceFound) {
Expand Down Expand Up @@ -788,7 +818,8 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
}
blockEnd = -1;
if (currentNode != null) {
addToDeadNodes(currentNode);
addToLocalDeadNodes(currentNode);
dfsClient.addNodeToDetect(this, currentNode);
}
if (--retries == 0) {
throw e;
Expand Down Expand Up @@ -870,7 +901,7 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
private LocatedBlock refetchLocations(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
deadNodes, ignoredNodes);
dfsClient.getDeadNodes(this), ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
String description = "Could not obtain block: " + blockInfo;
Expand Down Expand Up @@ -911,7 +942,7 @@ private LocatedBlock refetchLocations(LocatedBlock block,
throw new InterruptedIOException(
"Interrupted while choosing DataNode for read.");
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
openInfo(true);
block = refreshLocatedBlock(block);
failures++;
Expand All @@ -932,7 +963,7 @@ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
StorageType storageType = null;
if (nodes != null) {
for (int i = 0; i < nodes.length; i++) {
if (!deadNodes.containsKey(nodes[i])
if (!dfsClient.getDeadNodes(this).containsKey(nodes[i])
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
chosenNode = nodes[i];
// Storage types are ordered to correspond with nodes, so use the same
Expand Down Expand Up @@ -1084,7 +1115,7 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
DFSClient.LOG.warn(msg);
// we want to remember what we have tried
corruptedBlocks.addCorruptedBlock(block.getBlock(), datanode.info);
addToDeadNodes(datanode.info);
addToLocalDeadNodes(datanode.info);
throw new IOException(msg);
} catch (IOException e) {
checkInterrupted(e);
Expand All @@ -1106,7 +1137,8 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
String msg = "Failed to connect to " + datanode.addr + " for file "
+ src + " for block " + block.getBlock() + ":" + e;
DFSClient.LOG.warn("Connection failure: " + msg, e);
addToDeadNodes(datanode.info);
addToLocalDeadNodes(datanode.info);
dfsClient.addNodeToDetect(this, datanode.info);
throw new IOException(msg);
}
// Refresh the block for updated tokens in case of token failures or
Expand Down Expand Up @@ -1509,14 +1541,14 @@ public synchronized boolean seekToNewSource(long targetPos)
if (currentNode == null) {
return seekToBlockSource(targetPos);
}
boolean markedDead = deadNodes.containsKey(currentNode);
addToDeadNodes(currentNode);
boolean markedDead = dfsClient.isDeadNode(this, currentNode);
addToLocalDeadNodes(currentNode);
DatanodeInfo oldNode = currentNode;
DatanodeInfo newNode = blockSeekTo(targetPos);
if (!markedDead) {
/* remove it from deadNodes. blockSeekTo could have cleared
* deadNodes and added currentNode again. Thats ok. */
deadNodes.remove(oldNode);
removeFromLocalDeadNodes(oldNode);
}
if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) {
currentNode = newNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ protected String getSrc() {
return src;
}

protected DFSClient getDFSClient() {
return dfsClient;
}

protected LocatedBlocks getLocatedBlocks() {
return locatedBlocks;
}
Expand Down Expand Up @@ -282,7 +278,7 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
"block" + block.getBlock(), e);
// re-fetch the block in case the block has been moved
fetchBlockAt(block.getStartOffset());
addToDeadNodes(dnInfo.info);
addToLocalDeadNodes(dnInfo.info);
}
}
if (reader != null) {
Expand Down
Loading

0 comments on commit 0711432

Please sign in to comment.