Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-17814. Provide fallbacks for identity/cost providers and backoff enable #3230

Merged
merged 1 commit into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,18 @@ private CostProvider parseCostProvider(String ns, Configuration conf) {
ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
CostProvider.class);

if (providers.size() < 1) {
String[] nsPort = ns.split("\\.");
if (nsPort.length == 2) {
// Only if ns is split with ".", we can separate namespace and port.
// In the absence of "ipc.<port>.cost-provider.impl" property,
// we look up "ipc.cost-provider.impl" property.
providers = conf.getInstances(
nsPort[0] + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
CostProvider.class);
}
}

if (providers.size() < 1) {
LOG.info("CostProvider not specified, defaulting to DefaultCostProvider");
return new DefaultCostProvider();
Expand All @@ -303,6 +315,18 @@ private IdentityProvider parseIdentityProvider(String ns,
ns + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
IdentityProvider.class);

if (providers.size() < 1) {
String[] nsPort = ns.split("\\.");
if (nsPort.length == 2) {
// Only if ns is split with ".", we can separate namespace and port.
// In the absence of "ipc.<port>.identity-provider.impl" property,
// we look up "ipc.identity-provider.impl" property.
providers = conf.getInstances(
nsPort[0] + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
IdentityProvider.class);
}
}

if (providers.size() < 1) {
LOG.info("IdentityProvider not specified, " +
"defaulting to UserIdentityProvider");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,12 +831,14 @@ public synchronized void refreshCallQueue(Configuration conf) {
getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
maxQueueSize, prefix, conf);
callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf));
callQueue.setClientBackoffEnabled(getClientBackoffEnable(
CommonConfigurationKeys.IPC_NAMESPACE, port, conf));
}

/**
* Get from config if client backoff is enabled on that port.
*/
@Deprecated
static boolean getClientBackoffEnable(
String prefix, Configuration conf) {
String name = prefix + "." +
Expand All @@ -845,6 +847,32 @@ static boolean getClientBackoffEnable(
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
}

/**
* Return boolean value configured by property 'ipc.<port>.backoff.enable'
* if it is present. If the config is not present, default config
* (without port) is used to derive class i.e 'ipc.backoff.enable',
* and derived value is returned if configured. Otherwise, default value
* {@link CommonConfigurationKeys#IPC_BACKOFF_ENABLE_DEFAULT} is returned.
*
* @param namespace Namespace "ipc".
* @param port Server's listener port.
* @param conf Configuration properties.
* @return Value returned based on configuration.
*/
static boolean getClientBackoffEnable(
String namespace, int port, Configuration conf) {
String name = namespace + "." + port + "." +
CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
boolean valueWithPort = conf.getBoolean(name,
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
if (valueWithPort != CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT) {
return valueWithPort;
}
return conf.getBoolean(namespace + "."
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE,
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
}

/** A generic call queued for handling. */
public static class Call implements Schedulable,
PrivilegedExceptionAction<Void> {
Expand Down Expand Up @@ -3184,7 +3212,8 @@ protected Server(String bindAddress, int port,
this.callQueue = new CallQueueManager<>(
getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
getClientBackoffEnable(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
maxQueueSize, prefix, conf);

this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2504,6 +2504,17 @@
</description>
</property>

<property>
<name>ipc.backoff.enable</name>
<value>false</value>
<description>
This property is used as fallback property in case
"ipc.[port_number].backoff.enable" is not defined.
It determines whether or not to enable client backoff when
a queue is full.
</description>
</property>

<property>
<name>ipc.[port_number].callqueue.impl</name>
<value>java.util.concurrent.LinkedBlockingQueue</value>
Expand Down Expand Up @@ -2586,6 +2597,17 @@
</description>
</property>

<property>
<name>ipc.identity-provider.impl</name>
<value>org.apache.hadoop.ipc.UserIdentityProvider</value>
<description>
This property is used as fallback property in case
"ipc.[port_number].identity-provider.impl" is not defined.
The identity provider mapping user requests to their identity.
This property applies to DecayRpcScheduler.
</description>
</property>

<property>
<name>ipc.[port_number].cost-provider.impl</name>
<value>org.apache.hadoop.ipc.DefaultCostProvider</value>
Expand All @@ -2596,6 +2618,19 @@
</description>
</property>

<property>
<name>ipc.cost-provider.impl</name>
<value>org.apache.hadoop.ipc.DefaultCostProvider</value>
<description>
This property is used as fallback property in case
"ipc.[port_number].cost-provider.impl" is not defined.
The cost provider mapping user requests to their cost. To
enable determination of cost based on processing time, use
org.apache.hadoop.ipc.WeightedTimeCostProvider.
This property applies to DecayRpcScheduler.
</description>
</property>

<property>
<name>ipc.[port_number].decay-scheduler.period-ms</name>
<value>5000</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public void initializeMemberVariables() {

// FairCallQueue configs that includes dynamic ports in its keys
xmlPropsToSkipCompare.add("ipc.[port_number].backoff.enable");
xmlPropsToSkipCompare.add("ipc.backoff.enable");
xmlPropsToSkipCompare.add("ipc.[port_number].callqueue.impl");
xmlPropsToSkipCompare.add("ipc.callqueue.impl");
xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.impl");
Expand All @@ -164,7 +165,9 @@ public void initializeMemberVariables() {
xmlPropsToSkipCompare.add(
"ipc.[port_number].faircallqueue.multiplexer.weights");
xmlPropsToSkipCompare.add("ipc.[port_number].identity-provider.impl");
xmlPropsToSkipCompare.add("ipc.identity-provider.impl");
xmlPropsToSkipCompare.add("ipc.[port_number].cost-provider.impl");
xmlPropsToSkipCompare.add("ipc.cost-provider.impl");
xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.period-ms");
xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.decay-factor");
xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.thresholds");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@ private Schedulable mockCall(String id) {
return mockCall;
}

private static class TestIdentityProvider implements IdentityProvider {
public String makeIdentity(Schedulable obj) {
UserGroupInformation ugi = obj.getUserGroupInformation();
if (ugi == null) {
return null;
}
return ugi.getShortUserName();
}
}

private static class TestCostProvider implements CostProvider {

@Override
public void init(String namespace, Configuration conf) {
// No-op
}

@Override
public long getCost(ProcessingDetails details) {
return 1;
}
}

private DecayRpcScheduler scheduler;

@Test(expected=IllegalArgumentException.class)
Expand Down Expand Up @@ -83,6 +106,44 @@ public void testParsePeriod() {
assertEquals(1058L, scheduler.getDecayPeriodMillis());
}

@Test
@SuppressWarnings("deprecation")
public void testParsePeriodWithPortLessIdentityProvider() {
// By default
scheduler = new DecayRpcScheduler(1, "ipc.50", new Configuration());
assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
scheduler.getDecayPeriodMillis());

// Custom
Configuration conf = new Configuration();
conf.setLong("ipc.51." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
1058);
conf.unset("ipc.51." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY);
conf.set("ipc." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
"org.apache.hadoop.ipc.TestDecayRpcScheduler$TestIdentityProvider");
scheduler = new DecayRpcScheduler(1, "ipc.51", conf);
assertEquals(1058L, scheduler.getDecayPeriodMillis());
}

@Test
@SuppressWarnings("deprecation")
public void testParsePeriodWithPortLessCostProvider() {
// By default
scheduler = new DecayRpcScheduler(1, "ipc.52", new Configuration());
assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
scheduler.getDecayPeriodMillis());

// Custom
Configuration conf = new Configuration();
conf.setLong("ipc.52." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
1058);
conf.unset("ipc.52." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY);
conf.set("ipc." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
"org.apache.hadoop.ipc.TestDecayRpcScheduler$TestCostProvider");
scheduler = new DecayRpcScheduler(1, "ipc.52", conf);
assertEquals(1058L, scheduler.getDecayPeriodMillis());
}

@Test
@SuppressWarnings("deprecation")
public void testParseFactor() {
Expand Down