Skip to content

Commit

Permalink
HDFS-13248: Namenode needs to use the actual client IP when going thr…
Browse files Browse the repository at this point in the history
…ough the

RBF proxy. There is a new configuration knob dfs.namenode.ip-proxy-users that configures
the list of users than can set their client ip address using the client context.

Fixes apache#4081
  • Loading branch information
omalley authored and HarshitGupta11 committed Nov 28, 2022
1 parent 4c9050e commit 54e572c
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
@InterfaceStability.Evolving
public final class CallerContext {
public static final Charset SIGNATURE_ENCODING = StandardCharsets.UTF_8;

// field names
public static final String CLIENT_IP_STR = "clientIp";
public static final String CLIENT_PORT_STR = "clientPort";

/** The caller context.
*
* It will be truncated if it exceeds the maximum allowed length in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,19 @@ public UserGroupInformation getRealUser() {
return null;
}


/**
* If this is a proxy user, get the real user. Otherwise, return
* this user.
* @param user the user to check
* @return the real user or self
*/
public static UserGroupInformation getRealUserOrSelf(UserGroupInformation user) {
if (user == null) {
return null;
}
UserGroupInformation real = user.getRealUser();
return real != null ? real : user;
}

/**
* This class is used for storing the groups for testing. It stores a local
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,6 @@ public class RouterRpcClient {
private static final Pattern STACK_TRACE_PATTERN =
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");

private static final String CLIENT_IP_STR = "clientIp";
private static final String CLIENT_PORT_STR = "clientPort";

/** Fairness manager to control handlers assigned per NS. */
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -597,8 +594,8 @@ private void addClientIpToCallerContext() {
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.Builder builder =
new CallerContext.Builder("", contextFieldSeparator)
.append(CLIENT_IP_STR, Server.getRemoteAddress())
.append(CLIENT_PORT_STR,
.append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress())
.append(CallerContext.CLIENT_PORT_STR,
Integer.toString(Server.getRemotePort()))
.setSignature(origSignature);
// Append the original caller context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,20 @@ With this setting a user can interact with `ns-fed` as a regular namespace:
This federated namespace can also be set as the default one at **core-site.xml** using `fs.defaultFS`.


NameNode configuration
--------------------

In order for the system to support data-locality, you must configure your NameNodes so that they will trust the routers to supply the user's client IP address. `dfs.namenode.ip-proxy-users` defines a comma separated list of users that are allowed to provide the client ip address via the caller context.

```xml
<configuration>
<property>
<name>dfs.namenode.ip-proxy-users</name>
<value>hdfs</value>
</property>
</configuration>
```

Router configuration
--------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.lifeline.handler.count";
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
// List of users that can override their client ip
public static final String DFS_NAMENODE_IP_PROXY_USERS = "dfs.namenode.ip-proxy-users";
public static final String DFS_HTTP_POLICY_KEY = "dfs.http.policy";
public static final String DFS_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY.name();
public static final String DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS = "dfs.datanode.httpserver.filter.handlers";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");

private static final String CLIENT_PORT_STR = "clientPort";
private final String contextFieldSeparator;

boolean isAuditEnabled() {
Expand Down Expand Up @@ -467,15 +466,15 @@ private void appendClientPortToCallerContextIfAbsent() {
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent(
new CallerContext.Builder(origContext, contextFieldSeparator)
.append(CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
.append(CallerContext.CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
.setSignature(origSignature)
.build());
}
}

private boolean isClientPortInfoAbsent(CallerContext ctx){
return ctx == null || ctx.getContext() == null
|| !ctx.getContext().contains(CLIENT_PORT_STR);
|| !ctx.getContext().contains(CallerContext.CLIENT_PORT_STR);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
Expand All @@ -45,6 +46,9 @@
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.ipc.CallerContext;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -267,6 +271,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {

private final String defaultECPolicyName;

// Users who can override the client ip
private final String[] ipProxyUsers;

public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
this.nn = nn;
Expand All @@ -277,6 +284,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
int handlerCount =
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);

RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);
Expand Down Expand Up @@ -1899,7 +1907,29 @@ private void verifySoftwareVersion(DatanodeRegistration dnReg)
}
}

private static String getClientMachine() {
private String getClientMachine() {
if (ipProxyUsers != null) {
// Get the real user (or effective if it isn't a proxy user)
UserGroupInformation user =
UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
if (user != null &&
ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
CallerContext context = CallerContext.getCurrent();
if (context != null && context.isContextValid()) {
String cc = context.getContext();
// if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
// return "1.2.3.4" as the client machine.
String key = CallerContext.CLIENT_IP_STR +
CallerContext.Builder.KEY_VALUE_SEPARATOR;
int posn = cc.indexOf(key);
if (posn != -1) {
posn += key.length();
int end = cc.indexOf(",", posn);
return end == -1 ? cc.substring(posn) : cc.substring(posn, end);
}
}
}
}
String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
clientMachine = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,17 @@
</description>
</property>

<property>
<name>dfs.namenode.ip-proxy-users</name>
<value></value>
<description>A comma separated list of user names that are allowed by the
NameNode to specify a different client IP address in the caller context.
This is used by Router-Based Federation (RBF) to provide the actual client's
IP address to the NameNode, which is critical to preserve data locality when
using RBF. If you are using RBF, add the user that runs the routers.
</description>
</property>

<property>
<name>dfs.namenode.acls.enabled</name>
<value>true</value>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -24,14 +24,25 @@
*/
package org.apache.hadoop.hdfs.server.namenode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;

import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;

public class TestNameNodeRpcServer {
Expand Down Expand Up @@ -59,5 +70,88 @@ public void testNamenodeRpcBindAny() throws IOException {
conf.unset(DFS_NAMENODE_RPC_BIND_HOST_KEY);
}
}

/**
* Get the preferred DataNode location for the first block of the
* given file.
* @param fs The file system to use
* @param p The path to use
* @return the preferred host to get the data
*/
private static String getPreferredLocation(DistributedFileSystem fs,
Path p) throws IOException{
// Use getLocatedBlocks because it is the basis for HDFS open,
// but provides visibility into which host will be used.
LocatedBlocks blocks = fs.getClient()
.getLocatedBlocks(p.toUri().getPath(), 0);
return blocks.get(0).getLocations()[0].getHostName();
}

// Because of the randomness of the NN assigning DN, we run multiple
// trials. 1/3^20=3e-10, so that should be good enough.
static final int ITERATIONS_TO_USE = 20;

/**
* A test to make sure that if an authorized user adds "clientIp:" to their
* caller context, it will be used to make locality decisions on the NN.
*/
@Test
public void testNamenodeRpcClientIpProxy() throws IOException {
Configuration conf = new HdfsConfiguration();

conf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe");
// Make 3 nodes & racks so that we have a decent shot of detecting when
// our change overrides the random choice of datanode.
final String[] racks = new String[]{"/rack1", "/rack2", "/rack3"};
final String[] hosts = new String[]{"node1", "node2", "node3"};
MiniDFSCluster cluster = null;
final CallerContext original = CallerContext.getCurrent();

try {
cluster = new MiniDFSCluster.Builder(conf)
.racks(racks).hosts(hosts).numDataNodes(hosts.length)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
// Write a sample file
final Path fooName = fs.makeQualified(new Path("/foo"));
FSDataOutputStream stream = fs.create(fooName);
stream.write("Hello world!\n".getBytes(StandardCharsets.UTF_8));
stream.close();
// Set the caller context to set the ip address
CallerContext.setCurrent(
new CallerContext.Builder("test", conf)
.append(CallerContext.CLIENT_IP_STR, hosts[0])
.build());
// Should get a random mix of DataNodes since we aren't joe.
for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) {
String host = getPreferredLocation(fs, fooName);
if (!hosts[0].equals(host)) {
// found some other host, so things are good
break;
} else if (trial == ITERATIONS_TO_USE - 1) {
assertNotEquals("Failed to get non-node1", hosts[0], host);
}
}
// Run as fake joe to authorize the test
UserGroupInformation joe =
UserGroupInformation.createUserForTesting("fake_joe",
new String[]{"fake_group"});
DistributedFileSystem joeFs =
(DistributedFileSystem) DFSTestUtil.getFileSystemAs(joe, conf);
// As joe, we should get all node1.
for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) {
String host = getPreferredLocation(joeFs, fooName);
assertEquals("Trial " + trial + " failed", hosts[0], host);
}
} finally {
CallerContext.setCurrent(original);
if (cluster != null) {
cluster.shutdown();
}
// Reset the config
conf.unset(DFS_NAMENODE_IP_PROXY_USERS);
}
}
}

0 comments on commit 54e572c

Please sign in to comment.