Skip to content

HDFS-13248: Namenode needs to use the actual client IP when going through RBF proxy. #4081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
@InterfaceStability.Evolving
public final class CallerContext {
public static final Charset SIGNATURE_ENCODING = StandardCharsets.UTF_8;

// field names
public static final String CLIENT_IP_STR = "clientIp";
public static final String CLIENT_PORT_STR = "clientPort";

/** The caller context.
*
* It will be truncated if it exceeds the maximum allowed length in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,19 @@ public UserGroupInformation getRealUser() {
return null;
}


/**
* If this is a proxy user, get the real user. Otherwise, return
* this user.
* @param user the user to check
* @return the real user or self
*/
public static UserGroupInformation getRealUserOrSelf(UserGroupInformation user) {
if (user == null) {
return null;
}
UserGroupInformation real = user.getRealUser();
return real != null ? real : user;
}

/**
* This class is used for storing the groups for testing. It stores a local
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,6 @@ public class RouterRpcClient {
private static final Pattern STACK_TRACE_PATTERN =
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");

private static final String CLIENT_IP_STR = "clientIp";
private static final String CLIENT_PORT_STR = "clientPort";

/** Fairness manager to control handlers assigned per NS. */
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -597,8 +594,8 @@ private void addClientIpToCallerContext() {
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.Builder builder =
new CallerContext.Builder("", contextFieldSeparator)
.append(CLIENT_IP_STR, Server.getRemoteAddress())
.append(CLIENT_PORT_STR,
.append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress())
.append(CallerContext.CLIENT_PORT_STR,
Integer.toString(Server.getRemotePort()))
.setSignature(origSignature);
// Append the original caller context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,20 @@ With this setting a user can interact with `ns-fed` as a regular namespace:
This federated namespace can also be set as the default one at **core-site.xml** using `fs.defaultFS`.


NameNode configuration
--------------------

In order for the system to support data-locality, you must configure your NameNodes so that they will trust the routers to supply the user's client IP address. `dfs.namenode.ip-proxy-users` defines a comma separated list of users that are allowed to provide the client ip address via the caller context.

```xml
<configuration>
<property>
<name>dfs.namenode.ip-proxy-users</name>
<value>hdfs</value>
</property>
</configuration>
```

Router configuration
--------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.lifeline.handler.count";
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
// List of users that can override their client ip
public static final String DFS_NAMENODE_IP_PROXY_USERS = "dfs.namenode.ip-proxy-users";
public static final String DFS_HTTP_POLICY_KEY = "dfs.http.policy";
public static final String DFS_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY.name();
public static final String DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS = "dfs.datanode.httpserver.filter.handlers";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");

private static final String CLIENT_PORT_STR = "clientPort";
private final String contextFieldSeparator;

boolean isAuditEnabled() {
Expand Down Expand Up @@ -467,15 +466,15 @@ private void appendClientPortToCallerContextIfAbsent() {
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent(
new CallerContext.Builder(origContext, contextFieldSeparator)
.append(CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
.append(CallerContext.CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
.setSignature(origSignature)
.build());
}
}

private boolean isClientPortInfoAbsent(CallerContext ctx){
return ctx == null || ctx.getContext() == null
|| !ctx.getContext().contains(CLIENT_PORT_STR);
|| !ctx.getContext().contains(CallerContext.CLIENT_PORT_STR);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
Expand All @@ -45,6 +46,9 @@
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.ipc.CallerContext;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -267,6 +271,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {

private final String defaultECPolicyName;

// Users who can override the client ip
private final String[] ipProxyUsers;

public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
this.nn = nn;
Expand All @@ -277,6 +284,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
int handlerCount =
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);

RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine2.class);
Expand Down Expand Up @@ -1899,7 +1907,29 @@ private void verifySoftwareVersion(DatanodeRegistration dnReg)
}
}

private static String getClientMachine() {
private String getClientMachine() {
if (ipProxyUsers != null) {
// Get the real user (or effective if it isn't a proxy user)
UserGroupInformation user =
UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
if (user != null &&
ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
CallerContext context = CallerContext.getCurrent();
if (context != null && context.isContextValid()) {
String cc = context.getContext();
// if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
// return "1.2.3.4" as the client machine.
String key = CallerContext.CLIENT_IP_STR +
CallerContext.Builder.KEY_VALUE_SEPARATOR;
int posn = cc.indexOf(key);
if (posn != -1) {
posn += key.length();
int end = cc.indexOf(",", posn);
return end == -1 ? cc.substring(posn) : cc.substring(posn, end);
}
}
}
}
String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
clientMachine = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,17 @@
</description>
</property>

<property>
<name>dfs.namenode.ip-proxy-users</name>
<value></value>
<description>A comma separated list of user names that are allowed by the
NameNode to specify a different client IP address in the caller context.
This is used by Router-Based Federation (RBF) to provide the actual client's
IP address to the NameNode, which is critical to preserve data locality when
using RBF. If you are using RBF, add the user that runs the routers.
</description>
</property>

<property>
<name>dfs.namenode.acls.enabled</name>
<value>true</value>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -24,14 +24,25 @@
*/
package org.apache.hadoop.hdfs.server.namenode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;

import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;

public class TestNameNodeRpcServer {
Expand Down Expand Up @@ -59,5 +70,88 @@ public void testNamenodeRpcBindAny() throws IOException {
conf.unset(DFS_NAMENODE_RPC_BIND_HOST_KEY);
}
}

/**
* Get the preferred DataNode location for the first block of the
* given file.
* @param fs The file system to use
* @param p The path to use
* @return the preferred host to get the data
*/
private static String getPreferredLocation(DistributedFileSystem fs,
Path p) throws IOException{
// Use getLocatedBlocks because it is the basis for HDFS open,
// but provides visibility into which host will be used.
LocatedBlocks blocks = fs.getClient()
.getLocatedBlocks(p.toUri().getPath(), 0);
return blocks.get(0).getLocations()[0].getHostName();
}

// Because of the randomness of the NN assigning DN, we run multiple
// trials. 1/3^20=3e-10, so that should be good enough.
static final int ITERATIONS_TO_USE = 20;

/**
* A test to make sure that if an authorized user adds "clientIp:" to their
* caller context, it will be used to make locality decisions on the NN.
*/
@Test
public void testNamenodeRpcClientIpProxy() throws IOException {
Configuration conf = new HdfsConfiguration();

conf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe");
// Make 3 nodes & racks so that we have a decent shot of detecting when
// our change overrides the random choice of datanode.
final String[] racks = new String[]{"/rack1", "/rack2", "/rack3"};
final String[] hosts = new String[]{"node1", "node2", "node3"};
MiniDFSCluster cluster = null;
final CallerContext original = CallerContext.getCurrent();

try {
cluster = new MiniDFSCluster.Builder(conf)
.racks(racks).hosts(hosts).numDataNodes(hosts.length)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
// Write a sample file
final Path fooName = fs.makeQualified(new Path("/foo"));
FSDataOutputStream stream = fs.create(fooName);
stream.write("Hello world!\n".getBytes(StandardCharsets.UTF_8));
stream.close();
// Set the caller context to set the ip address
CallerContext.setCurrent(
new CallerContext.Builder("test", conf)
.append(CallerContext.CLIENT_IP_STR, hosts[0])
.build());
// Should get a random mix of DataNodes since we aren't joe.
for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) {
String host = getPreferredLocation(fs, fooName);
if (!hosts[0].equals(host)) {
// found some other host, so things are good
break;
} else if (trial == ITERATIONS_TO_USE - 1) {
assertNotEquals("Failed to get non-node1", hosts[0], host);
}
}
// Run as fake joe to authorize the test
UserGroupInformation joe =
UserGroupInformation.createUserForTesting("fake_joe",
new String[]{"fake_group"});
DistributedFileSystem joeFs =
(DistributedFileSystem) DFSTestUtil.getFileSystemAs(joe, conf);
// As joe, we should get all node1.
for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) {
String host = getPreferredLocation(joeFs, fooName);
assertEquals("Trial " + trial + " failed", hosts[0], host);
}
} finally {
CallerContext.setCurrent(original);
if (cluster != null) {
cluster.shutdown();
}
// Reset the config
conf.unset(DFS_NAMENODE_IP_PROXY_USERS);
}
}
}