Skip to content

HDFS-16671. RBF: RouterRpcFairnessPolicyController supports configurable permit acquire timeout #4597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;

/**
* Base fairness policy that implements @RouterRpcFairnessPolicyController.
* Internally a map of nameservice to Semaphore is used to control permits.
Expand All @@ -42,15 +45,26 @@ public class AbstractRouterRpcFairnessPolicyController
/** Hash table to hold semaphore for each configured name service. */
private Map<String, Semaphore> permits;

private long acquireTimeoutMs = DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;

public void init(Configuration conf) {
this.permits = new HashMap<>();
long timeoutMs = conf.getTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT,
DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
if (timeoutMs >= 0) {
acquireTimeoutMs = timeoutMs;
Comment on lines +54 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is an invalid entry configured and we are moving to using the default value. We should atleast have a warn log. Kind of Invalid value -1 configured for dfs.... should be greater than or equal to 0, Using default value of : 1s instead
something like this

} else {
LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
"Using default value of : {}ms instead.", timeoutMs,
DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT);
}
}

@Override
public boolean acquirePermit(String nsId) {
try {
LOG.debug("Taking lock for nameservice {}", nsId);
return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS);
return this.permits.get(nsId).tryAcquire(acquireTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.debug("Cannot get a permit for nameservice {}", nsId);
}
Expand Down Expand Up @@ -82,15 +96,13 @@ protected int getAvailablePermits(String nsId) {
@Override
public String getAvailableHandlerOnPerNs() {
JSONObject json = new JSONObject();
for (Map.Entry<String, Semaphore> entry : permits.entrySet()) {
permits.forEach((k, v) -> {
try {
String nsId = entry.getKey();
int availableHandler = entry.getValue().availablePermits();
json.put(nsId, availableHandler);
json.put(k, v.availablePermits());
} catch (JSONException e) {
LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e);
LOG.warn("Cannot put {} into JSONObject", k, e);
}
}
});
return json.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,10 @@ public StaticRouterRpcFairnessPolicyController(Configuration conf) {
init(conf);
}

public void init(Configuration conf)
throws IllegalArgumentException {
public void init(Configuration conf) throws IllegalArgumentException {
super.init(conf);
// Total handlers configured to process all incoming Rpc.
int handlerCount = conf.getInt(
DFS_ROUTER_HANDLER_COUNT_KEY,
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
int handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);

LOG.info("Handlers available for fairness assignment {} ", handlerCount);

Expand All @@ -71,8 +68,7 @@ public void init(Configuration conf)
allConfiguredNS.add(CONCURRENT_NS);
validateHandlersCount(conf, handlerCount, allConfiguredNS);
for (String nsId : allConfiguredNS) {
int dedicatedHandlers =
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
if (dedicatedHandlers > 0) {
handlerCount -= dedicatedHandlers;
Expand All @@ -86,7 +82,7 @@ public void init(Configuration conf)
// Assign remaining handlers equally to remaining name services and
// general pool if applicable.
if (!unassignedNS.isEmpty()) {
LOG.info("Unassigned ns {}", unassignedNS.toString());
LOG.info("Unassigned ns {}", unassignedNS);
int handlersPerNS = handlerCount / unassignedNS.size();
LOG.info("Handlers available per ns {}", handlersPerNS);
for (String nsId : unassignedNS) {
Expand All @@ -101,24 +97,20 @@ public void init(Configuration conf)
int existingPermits = getAvailablePermits(CONCURRENT_NS);
if (leftOverHandlers > 0) {
LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
insertNameServiceWithPermits(CONCURRENT_NS,
existingPermits + leftOverHandlers);
insertNameServiceWithPermits(CONCURRENT_NS, existingPermits + leftOverHandlers);
}
LOG.info("Final permit allocation for concurrent ns: {}",
getAvailablePermits(CONCURRENT_NS));
LOG.info("Final permit allocation for concurrent ns: {}", getAvailablePermits(CONCURRENT_NS));
}

private static void logAssignment(String nsId, int count) {
LOG.info("Assigned {} handlers to nsId {} ",
count, nsId);
LOG.info("Assigned {} handlers to nsId {} ", count, nsId);
}

private void validateHandlersCount(Configuration conf, int handlerCount,
Set<String> allConfiguredNS) {
private void validateHandlersCount(Configuration conf,
int handlerCount, Set<String> allConfiguredNS) {
int totalDedicatedHandlers = 0;
for (String nsId : allConfiguredNS) {
int dedicatedHandlers =
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
if (dedicatedHandlers > 0) {
// Total handlers should not be less than sum of dedicated handlers.
totalDedicatedHandlers += dedicatedHandlers;
Expand All @@ -128,8 +120,7 @@ private void validateHandlersCount(Configuration conf, int handlerCount,
}
}
if (totalDedicatedHandlers > handlerCount) {
String msg = String.format(ERROR_MSG, handlerCount,
totalDedicatedHandlers);
String msg = String.format(ERROR_MSG, handlerCount, totalDedicatedHandlers);
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
NoRouterRpcFairnessPolicyController.class;
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
public static final String DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "acquire.timeout";
public static final long DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT =
TimeUnit.SECONDS.toMillis(1);

// HDFS Router Federation Rename.
public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,14 @@
</description>
</property>

<property>
<name>dfs.federation.router.fairness.acquire.timeout</name>
<value>1s</value>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this configuration confirmed to be like this?

I feel the following is correct

<value>1</value>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, your configuration is accurate.

<description>
The maximum time to wait for a permit.
</description>
</property>

<property>
<name>dfs.federation.router.federation.rename.bandwidth</name>
<value>10</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Test;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
Expand Down Expand Up @@ -83,6 +87,26 @@ public void testHandlerAllocationPreconfigured() {
assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
}

@Test
public void testAcquireTimeout() {
Configuration conf = createConf(40);
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30);
conf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 100, TimeUnit.MILLISECONDS);
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
FederationUtil.newFairnessPolicyController(conf);

// ns1 should have 30 permits allocated
for (int i = 0; i < 30; i++) {
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
}
long acquireBeginTimeMs = Time.monotonicNow();
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
long acquireTimeMs = Time.monotonicNow() - acquireBeginTimeMs;

// There are some other operations, so acquireTimeMs >= 100ms.
assertTrue(acquireTimeMs >= 100);
}

@Test
public void testAllocationErrorWithZeroHandlers() {
Configuration conf = createConf(0);
Expand Down