-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HDFS-17181 WebHDFS: Route all CREATE requests to the BlockManager #6108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ | |
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.ExecutionException; | ||
|
||
import javax.servlet.ServletContext; | ||
|
@@ -278,6 +279,9 @@ protected void queueExternalCall(ExternalCall call) | |
namenode.queueExternalCall(call); | ||
} | ||
|
||
/** | ||
* Chooses a Datanode to redirect a request to. | ||
*/ | ||
@VisibleForTesting | ||
static DatanodeInfo chooseDatanode(final NameNode namenode, | ||
final String path, final HttpOpParam.Op op, final long openOffset, | ||
|
@@ -288,18 +292,18 @@ static DatanodeInfo chooseDatanode(final NameNode namenode, | |
throw new IOException("Namesystem has not been initialized yet."); | ||
} | ||
final BlockManager bm = fsn.getBlockManager(); | ||
HashSet<Node> excludes = new HashSet<Node>(); | ||
|
||
Set<Node> excludes = new HashSet<>(); | ||
if (excludeDatanodes != null) { | ||
for (String host : StringUtils | ||
.getTrimmedStringCollection(excludeDatanodes)) { | ||
int idx = host.indexOf(":"); | ||
int idx = host.indexOf(':'); | ||
Node excludeNode = null; | ||
if (idx != -1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes the code easier to read by just switching the |
||
excludeNode = bm.getDatanodeManager().getDatanodeByXferAddr( | ||
host.substring(0, idx), Integer.parseInt(host.substring(idx + 1))); | ||
} else { | ||
if (idx == -1) { | ||
excludeNode = bm.getDatanodeManager().getDatanodeByHost(host); | ||
} else { | ||
excludeNode = bm.getDatanodeManager().getDatanodeByXferAddr( | ||
host.substring(0, idx), Integer.parseInt(host.substring(idx + 1))); | ||
} | ||
|
||
if (excludeNode != null) { | ||
|
@@ -311,25 +315,15 @@ static DatanodeInfo chooseDatanode(final NameNode namenode, | |
} | ||
} | ||
|
||
if (op == PutOpParam.Op.CREATE) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the actual change. It used to treat |
||
//choose a datanode near to client | ||
final DatanodeDescriptor clientNode = bm.getDatanodeManager( | ||
).getDatanodeByHost(remoteAddr); | ||
if (clientNode != null) { | ||
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS( | ||
path, clientNode, excludes, blocksize); | ||
if (storages.length > 0) { | ||
return storages[0].getDatanodeDescriptor(); | ||
} | ||
} | ||
} else if (op == GetOpParam.Op.OPEN | ||
// For these operations choose a datanode containing a replica | ||
if (op == GetOpParam.Op.OPEN | ||
|| op == GetOpParam.Op.GETFILECHECKSUM | ||
|| op == PostOpParam.Op.APPEND) { | ||
//choose a datanode containing a replica | ||
final NamenodeProtocols np = getRPCServer(namenode); | ||
if (status == null) { | ||
throw new FileNotFoundException("File " + path + " not found."); | ||
} | ||
|
||
final long len = status.getLen(); | ||
if (op == GetOpParam.Op.OPEN) { | ||
if (openOffset < 0L || (openOffset >= len && len > 0)) { | ||
|
@@ -344,10 +338,22 @@ static DatanodeInfo chooseDatanode(final NameNode namenode, | |
final int count = locations.locatedBlockCount(); | ||
if (count > 0) { | ||
return bestNode(locations.get(0).getLocations(), excludes); | ||
} else { | ||
throw new IOException("Block could not be located. Path=" + path + ", offset=" + offset); | ||
} | ||
} | ||
} | ||
|
||
// All other operations don't affect a specific node so let the BlockManager pick a target | ||
DatanodeDescriptor clientNode = bm.getDatanodeManager( | ||
).getDatanodeByHost(remoteAddr); | ||
|
||
DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS( | ||
path, clientNode, excludes, blocksize); | ||
if (storages.length > 0) { | ||
return storages[0].getDatanodeDescriptor(); | ||
} | ||
|
||
return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology( | ||
).chooseRandom(NodeBase.ROOT, excludes); | ||
} | ||
|
@@ -358,13 +364,13 @@ static DatanodeInfo chooseDatanode(final NameNode namenode, | |
* to return the first element of the node here. | ||
*/ | ||
protected static DatanodeInfo bestNode(DatanodeInfo[] nodes, | ||
HashSet<Node> excludes) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unrelated: Just removes warnings again and makes code clearer |
||
Set<Node> excludes) throws IOException { | ||
for (DatanodeInfo dn: nodes) { | ||
if (false == dn.isDecommissioned() && false == excludes.contains(dn)) { | ||
if (!dn.isDecommissioned() && !excludes.contains(dn)) { | ||
return dn; | ||
} | ||
} | ||
throw new IOException("No active nodes contain this block"); | ||
throw new IOException("No active and not excluded nodes contain this block"); | ||
} | ||
|
||
public long renewDelegationToken(Token<DelegationTokenIdentifier> token) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just removes a warning