diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java index c8b4135d088ed..dbd9184a2b91e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java @@ -43,6 +43,11 @@ @InterfaceStability.Evolving public final class CallerContext { public static final Charset SIGNATURE_ENCODING = StandardCharsets.UTF_8; + + // field names + public static final String CLIENT_IP_STR = "clientIp"; + public static final String CLIENT_PORT_STR = "clientPort"; + /** The caller context. * * It will be truncated if it exceeds the maximum allowed length in diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java index 67d151862be4d..973e6a20d4929 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java @@ -1509,7 +1509,19 @@ public UserGroupInformation getRealUser() { return null; } - + /** + * If this is a proxy user, get the real user. Otherwise, return + * this user. + * @param user the user to check + * @return the real user or self + */ + public static UserGroupInformation getRealUserOrSelf(UserGroupInformation user) { + if (user == null) { + return null; + } + UserGroupInformation real = user.getRealUser(); + return real != null ? real : user; + } /** * This class is used for storing the groups for testing. It stores a local diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 16cd7a52cb4e8..ef84f301a90e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -545,8 +545,8 @@ private void addClientIpToCallerContext() { byte[] origSignature = ctx == null ? null : ctx.getSignature(); CallerContext.Builder builder = new CallerContext.Builder("", contextFieldSeparator) - .append(CLIENT_IP_STR, Server.getRemoteAddress()) - .append(CLIENT_PORT_STR, + .append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress()) + .append(CallerContext.CLIENT_PORT_STR, Integer.toString(Server.getRemotePort())) .setSignature(origSignature); // Append the original caller context diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md index de45645db093c..dce2c654669cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md @@ -362,6 +362,20 @@ With this setting a user can interact with `ns-fed` as a regular namespace: This federated namespace can also be set as the default one at **core-site.xml** using `fs.defaultFS`. +NameNode configuration +-------------------- + +In order for the system to support data-locality, you must configure your NameNodes so that they will trust the routers to supply the user's client IP address. `dfs.namenode.ip-proxy-users` defines a comma separated list of users that are allowed to provide the client ip address via the caller context. + +```xml + + + dfs.namenode.ip-proxy-users + hdfs + + +``` + Router configuration -------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 452da7f7134b1..6b00a708c5f4d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -918,6 +918,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.lifeline.handler.count"; public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count"; public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10; + // List of users that can override their client ip + public static final String DFS_NAMENODE_IP_PROXY_USERS = "dfs.namenode.ip-proxy-users"; public static final String DFS_HTTP_POLICY_KEY = "dfs.http.policy"; public static final String DFS_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY.name(); public static final String DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS = "dfs.datanode.httpserver.filter.handlers"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 318442d2799d4..3ce8a80be6517 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -387,7 +387,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, @Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics = registry.newRatesWithAggregation("detailedLockHoldTimeMetrics"); - private static final String CLIENT_PORT_STR = "clientPort"; private final String contextFieldSeparator; boolean isAuditEnabled() { @@ -453,7 +452,7 @@ private void appendClientPortToCallerContextIfAbsent() { byte[] origSignature = ctx == null ? null : ctx.getSignature(); CallerContext.setCurrent( new CallerContext.Builder(origContext, contextFieldSeparator) - .append(CLIENT_PORT_STR, String.valueOf(Server.getRemotePort())) + .append(CallerContext.CLIENT_PORT_STR, String.valueOf(Server.getRemotePort())) .setSignature(origSignature) .build()); } @@ -461,7 +460,7 @@ private void appendClientPortToCallerContextIfAbsent() { private boolean isClientPortInfoAbsent(CallerContext ctx){ return ctx == null || ctx.getContext() == null - || !ctx.getContext().contains(CLIENT_PORT_STR); + || !ctx.getContext().contains(CallerContext.CLIENT_PORT_STR); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index e5d8aa9a40dbe..e6610e0e505f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY; @@ -45,7 +46,9 @@ import java.util.Map; import java.util.Set; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; +import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; @@ -267,6 +270,9 @@ public class NameNodeRpcServer implements NamenodeProtocols { private final String defaultECPolicyName; + // Users who can override the client ip + private final String[] ipProxyUsers; + public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { this.nn = nn; @@ -277,6 +283,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) int handlerCount = conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, DFS_NAMENODE_HANDLER_COUNT_DEFAULT); + ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS); RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class); @@ -1890,7 +1897,29 @@ private void verifySoftwareVersion(DatanodeRegistration dnReg) } } - private static String getClientMachine() { + private String getClientMachine() { + if (ipProxyUsers != null) { + // Get the real user (or effective if it isn't a proxy user) + UserGroupInformation user = + UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser()); + if (user != null && + ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) { + CallerContext context = CallerContext.getCurrent(); + if (context != null && context.isContextValid()) { + String cc = context.getContext(); + // if the rpc has a caller context of "clientIp:1.2.3.4,CLI", + // return "1.2.3.4" as the client machine. + String key = CallerContext.CLIENT_IP_STR + + CallerContext.Builder.KEY_VALUE_SEPARATOR; + int posn = cc.indexOf(key); + if (posn != -1) { + posn += key.length(); + int end = cc.indexOf(",", posn); + return end == -1 ? cc.substring(posn) : cc.substring(posn, end); + } + } + } + } String clientMachine = Server.getRemoteAddress(); if (clientMachine == null) { //not a RPC client clientMachine = ""; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index a0c47e8cfcff9..917e655fdb62e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -544,6 +544,17 @@ + + dfs.namenode.ip-proxy-users + + A comma separated list of user names that are allowed by the + NameNode to specify a different client IP address in the caller context. + This is used by Router-Based Federation (RBF) to provide the actual client's + IP address to the NameNode, which is critical to preserve data locality when + using RBF. If you are using RBF, add the user that runs the routers. + + + dfs.namenode.acls.enabled true diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java index ada93e84f0ead..74d85bc637efa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,14 +24,25 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import java.io.IOException; +import java.nio.charset.StandardCharsets; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Test; public class TestNameNodeRpcServer { @@ -59,5 +70,88 @@ public void testNamenodeRpcBindAny() throws IOException { conf.unset(DFS_NAMENODE_RPC_BIND_HOST_KEY); } } + + /** + * Get the preferred DataNode location for the first block of the + * given file. + * @param fs The file system to use + * @param p The path to use + * @return the preferred host to get the data + */ + private static String getPreferredLocation(DistributedFileSystem fs, + Path p) throws IOException{ + // Use getLocatedBlocks because it is the basis for HDFS open, + // but provides visibility into which host will be used. + LocatedBlocks blocks = fs.getClient() + .getLocatedBlocks(p.toUri().getPath(), 0); + return blocks.get(0).getLocations()[0].getHostName(); + } + + // Because of the randomness of the NN assigning DN, we run multiple + // trials. 1/3^20=3e-10, so that should be good enough. + static final int ITERATIONS_TO_USE = 20; + + /** + * A test to make sure that if an authorized user adds "clientIp:" to their + * caller context, it will be used to make locality decisions on the NN. + */ + @Test + public void testNamenodeRpcClientIpProxy() throws IOException { + Configuration conf = new HdfsConfiguration(); + + conf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe"); + // Make 3 nodes & racks so that we have a decent shot of detecting when + // our change overrides the random choice of datanode. + final String[] racks = new String[]{"/rack1", "/rack2", "/rack3"}; + final String[] hosts = new String[]{"node1", "node2", "node3"}; + MiniDFSCluster cluster = null; + final CallerContext original = CallerContext.getCurrent(); + + try { + cluster = new MiniDFSCluster.Builder(conf) + .racks(racks).hosts(hosts).numDataNodes(hosts.length) + .build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + // Write a sample file + final Path fooName = fs.makeQualified(new Path("/foo")); + FSDataOutputStream stream = fs.create(fooName); + stream.write("Hello world!\n".getBytes(StandardCharsets.UTF_8)); + stream.close(); + // Set the caller context to set the ip address + CallerContext.setCurrent( + new CallerContext.Builder("test", conf) + .append(CallerContext.CLIENT_IP_STR, hosts[0]) + .build()); + // Should get a random mix of DataNodes since we aren't joe. + for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) { + String host = getPreferredLocation(fs, fooName); + if (!hosts[0].equals(host)) { + // found some other host, so things are good + break; + } else if (trial == ITERATIONS_TO_USE - 1) { + assertNotEquals("Failed to get non-node1", hosts[0], host); + } + } + // Run as fake joe to authorize the test + UserGroupInformation joe = + UserGroupInformation.createUserForTesting("fake_joe", + new String[]{"fake_group"}); + DistributedFileSystem joeFs = + (DistributedFileSystem) DFSTestUtil.getFileSystemAs(joe, conf); + // As joe, we should get all node1. + for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) { + String host = getPreferredLocation(joeFs, fooName); + assertEquals("Trial " + trial + " failed", hosts[0], host); + } + } finally { + CallerContext.setCurrent(original); + if (cluster != null) { + cluster.shutdown(); + } + // Reset the config + conf.unset(DFS_NAMENODE_IP_PROXY_USERS); + } + } }