Skip to content

SPDI-166362. The AsyncResponderExecutor in AsyncRpcProtocolPBUtil should be distinguished per namespace #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,22 @@
* @see CompletableFuture
*/
public final class AsyncRpcProtocolPBUtil {
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
private static final ThreadLocal<Executor> asyncNSResponderExecutor = new ThreadLocal<>();

/** The executor used for handling responses asynchronously. */
private static Executor asyncResponderExecutor;

private AsyncRpcProtocolPBUtil() {}

public static Executor getAsyncNSResponderExecutor() {
return asyncNSResponderExecutor.get();
}

public static void setAsyncNSResponderExecutor(Executor nsExecutor) {
asyncNSResponderExecutor.set(nsExecutor);
}

/**
* Asynchronously invokes an RPC call and applies a response transformation function
* to the result. This method is generic and can be used to handle any type of
Expand Down Expand Up @@ -86,8 +96,11 @@ public static <T, R> R asyncIpcClient(
CompletableFuture<Writable> responseFuture = Client.getResponseFuture();
// transfer thread local context to worker threads of executor.
ThreadLocalContext threadLocalContext = new ThreadLocalContext();
Executor responseExecutor = getAsyncResponderExecutor();
asyncCompleteWith(responseFuture.handleAsync((result, e) -> {
threadLocalContext.transfer();
// No need to clean it up since the executor is always used by the current namespace
AsyncRpcProtocolPBUtil.setAsyncNSResponderExecutor(responseExecutor);
if (e != null) {
throw warpCompletionException(e);
}
Expand All @@ -97,10 +110,19 @@ public static <T, R> R asyncIpcClient(
} catch (Exception ex) {
throw warpCompletionException(ex);
}
}, asyncResponderExecutor));
}, responseExecutor));
return asyncReturn(clazz);
}

private static Executor getAsyncResponderExecutor() {
Executor responseExecutor = getAsyncNSResponderExecutor();
if (responseExecutor == null) {
LOG.warn("No dedicated response executor found for the namespace.");
responseExecutor = asyncResponderExecutor;
}
return responseExecutor;
}

/**
* Asynchronously invokes an RPC call and applies a response transformation function
* to the result on server-side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,22 @@ public void routerFailureReadOnly() {
}
}

@Override
public void recordAsyncHandlerQueueSize(String nsId, int queueSize) {
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
nameserviceRPCMetricsMap.get(nsId).setAsyncHandlerQueueSize(queueSize);
}
}

@Override
public void recordAsyncResponderQueueSize(String nsId, int queueSize) {
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
nameserviceRPCMetricsMap.get(nsId).setAsyncResponderQueueSize(queueSize);
}
}

@Override
public void routerFailureLocked() {
if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableRate;

import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -55,6 +56,10 @@ public class NameserviceRPCMetrics implements NameserviceRPCMBean {
private MutableCounterLong proxyOpPermitRejected;
@Metric("Number of operations accepted to hit a namenode")
private MutableCounterLong proxyOpPermitAccepted;
@Metric("Async Handler Queue Size")
private MutableGaugeInt asyncHandlerQueueSize;
@Metric("Async Responder Queue Size")
private MutableGaugeInt asyncResponderQueueSize;

public NameserviceRPCMetrics(Configuration conf, String nsId) {
this.nsId = NAMESERVICE_RPC_METRICS_PREFIX + nsId;
Expand Down Expand Up @@ -116,6 +121,14 @@ public long getProxyOpPermitAccepted() {
return proxyOpPermitAccepted.value();
}

public void setAsyncHandlerQueueSize(int size) {
asyncHandlerQueueSize.set(size);
}

public void setAsyncResponderQueueSize(int size) {
asyncResponderQueueSize.set(size);
}

/**
* Add the time to proxy an operation from the moment the Router sends it to
* the Namenode until it replied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count";
public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "queue.size";
public static final int DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT = 1000;
public static final String DFS_ROUTER_ASYNC_RPC_NS_RESPONDER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "ns.responder.count";
public static final String DFS_ROUTER_ASYNC_RPC_NS_RESPONDER_COUNT_DEFAULT = "";
public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,20 @@ void init(
* If a path is in a read only mount point.
*/
void routerFailureReadOnly();

/**
* Records the size of the async handler queue for the given namespace.
*
* @param nsId the namespace identifier
* @param queueSize the current size of the async queue
*/
void recordAsyncHandlerQueueSize(String nsId, int queueSize);

/**
* Records the size of the async responder queue for the given namespace.
*
* @param nsId the namespace identifier
* @param queueSize the current size of the async queue
*/
void recordAsyncResponderQueueSize(String nsId, int queueSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_RESPONDER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_RESPONDER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
Expand Down Expand Up @@ -58,7 +62,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
Expand All @@ -70,7 +73,9 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -292,11 +297,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
private RouterRenameOption routerRenameOption;
/** Schedule the router federation rename jobs. */
private BalanceProcedureScheduler fedRenameScheduler;
private boolean enableAsync;
private Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
private Map<String, ExecutorService> asyncRouterHandlerExecutors = new ConcurrentHashMap<>();
private ExecutorService routerAsyncResponderExecutor;
private ExecutorService routerDefaultAsyncHandlerExecutor;
private final boolean enableAsync;
private final Map<String, ThreadPoolExecutor> asyncRouterHandlerExecutors = new ConcurrentHashMap<>();
private ThreadPoolExecutor routerDefaultAsyncHandlerExecutor;
private final Map<String, ThreadPoolExecutor> asyncResponderExecutors = new ConcurrentHashMap<>();
private ThreadPoolExecutor defaultAsyncResponderExecutor;

/**
* Construct a router RPC server.
Expand Down Expand Up @@ -503,74 +508,139 @@ public RouterRpcServer(Configuration conf, Router router,
* @param configuration the configuration.
*/
public void initAsyncThreadPools(Configuration configuration) {
LOG.info("Begin initialize asynchronous handler and responder thread pool.");
initNsAsyncHandlerCount();
Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration);
Set<String> unassignedNS = new HashSet<>();
allConfiguredNS.add(CONCURRENT_NS);

initAsyncHandlerThreadPools(configuration, allConfiguredNS);
initAsyncResponderThreadPools(configuration, allConfiguredNS);
}

private void initAsyncHandlerThreadPools(Configuration configuration, Set<String> allConfiguredNS) {
LOG.info("Begin initialize asynchronous handler thread pool.");
Map<String, Integer> nsAsyncHandlerCount = parseNsAsyncHandlerCount(configuration,
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);

int asyncQueueSize = configuration.getInt(DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE,
DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT);
if (asyncQueueSize <= 1) {
throw new IllegalArgumentException("Async queue size must be greater than 1");
}
int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
for (String nsId : allConfiguredNS) {
int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
if (dedicatedHandlers <= 0) {
dedicatedHandlers = asyncHandlerCountDefault;
LOG.info("Use default async handler count {} for ns {}.", asyncHandlerCountDefault, nsId);
} else {
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
}

if (dedicatedHandlers > 0) {
initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
int finalDedicatedHandlers = dedicatedHandlers;
asyncRouterHandlerExecutors.computeIfAbsent(nsId,
id -> initAsyncHandlerThreadPools4Ns(id, asyncQueueSize, finalDedicatedHandlers));
LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId);
} else {
unassignedNS.add(nsId);
}
}

int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);

if (!unassignedNS.isEmpty()) {
LOG.warn("Async handler unassigned ns: {}", unassignedNS);
LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault);
for (String nsId : unassignedNS) {
initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
}
if (routerDefaultAsyncHandlerExecutor == null) {
LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault);
routerDefaultAsyncHandlerExecutor = initAsyncHandlerThreadPools4Ns(
"default", asyncQueueSize, asyncHandlerCountDefault);
}
}

private void initAsyncResponderThreadPools(Configuration configuration, Set<String> allConfiguredNS) {
LOG.info("Begin initialize asynchronous responder thread pool.");
Map<String, Integer> nsAsyncResponderCount = parseNsAsyncHandlerCount(configuration,
DFS_ROUTER_ASYNC_RPC_NS_RESPONDER_COUNT_KEY, DFS_ROUTER_ASYNC_RPC_NS_RESPONDER_COUNT_DEFAULT);
int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
if (routerAsyncResponderExecutor == null) {
for (String nsId : allConfiguredNS) {
int dedicatedResponder = nsAsyncResponderCount.getOrDefault(nsId, 0);
if (dedicatedResponder <= 0) {
dedicatedResponder = asyncResponderCount;
LOG.info("Use default async responder count {} for ns {}.", dedicatedResponder, nsId);
} else {
LOG.info("Dedicated responder {} for ns {} ", dedicatedResponder, nsId);
}

if (dedicatedResponder > 0) {
int finalDedicatedResponder = dedicatedResponder;
asyncResponderExecutors.computeIfAbsent(nsId,
id -> new ThreadPoolExecutor(finalDedicatedResponder, finalDedicatedResponder,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
new AsyncThreadFactory("Router Async Handler for " + nsId + " #")));
LOG.info("Assigned {} async responders to nsId {} ", finalDedicatedResponder, nsId);
}
}

if (defaultAsyncResponderExecutor == null) {
LOG.info("Initialize router async responder count: {}", asyncResponderCount);
routerAsyncResponderExecutor = Executors.newFixedThreadPool(
asyncResponderCount, new AsyncThreadFactory("Router Async Responder #"));
defaultAsyncResponderExecutor = new ThreadPoolExecutor(asyncResponderCount, asyncResponderCount,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
new AsyncThreadFactory("Router Async Responder #"));
}
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(defaultAsyncResponderExecutor);
}

if (routerDefaultAsyncHandlerExecutor == null) {
LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault);
routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #"));
private ThreadPoolExecutor initAsyncHandlerThreadPools4Ns(String ns, int asyncQueueSize, int handlerCount) {
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(asyncQueueSize);
return new ThreadPoolExecutor(handlerCount, handlerCount,
0L, TimeUnit.MILLISECONDS, queue,
new AsyncThreadFactory("Router Async Handler for " + ns + " #"));
}

/**
* Returns the asynchronous executor for the specified namespace.
* If no executor is configured for the given namespace ID, returns the default executor.
*
* @param nsId the namespace identifier
* @return the corresponding ExecutorService
*/
public ThreadPoolExecutor getAsyncExecutorForNamespace(String nsId) {
ThreadPoolExecutor executorService = asyncRouterHandlerExecutors.getOrDefault(
nsId, routerDefaultAsyncHandlerExecutor);
if (rpcMonitor != null) {
rpcMonitor.recordAsyncHandlerQueueSize(nsId, executorService.getQueue().size());
}
return executorService;
}

private void initNsAsyncHandlerCount() {
String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
/**
* Returns the asynchronous executor for the specified namespace.
* If no executor is configured for the given namespace ID, returns the default executor.
*
* @param nsId the namespace identifier
* @return the corresponding ExecutorService
*/
public ThreadPoolExecutor getAsyncResponderForNamespace(String nsId) {
ThreadPoolExecutor executorService = asyncResponderExecutors.getOrDefault(
nsId, defaultAsyncResponderExecutor);
if (rpcMonitor != null) {
rpcMonitor.recordAsyncResponderQueueSize(nsId, executorService.getQueue().size());
}
return executorService;
}

private Map<String, Integer> parseNsAsyncHandlerCount(Configuration conf, String key, String defaultValue) {
String configNsHandler = conf.get(key, defaultValue);
Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
if (StringUtils.isEmpty(configNsHandler)) {
LOG.error(
"The value of config key: {} is empty. Will use default conf.",
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
LOG.error("The value of config key: {} is empty. Will use default conf.", key);
}
String[] nsHandlers = configNsHandler.split(",");
for (String nsHandlerInfo : nsHandlers) {
String[] nsHandlerItems = nsHandlerInfo.split(":");
if (nsHandlerItems.length != 2 || StringUtils.isBlank(nsHandlerItems[0]) ||
!StringUtils.isNumeric(nsHandlerItems[1])) {
LOG.error("The config key: {} is incorrect! The value is {}.",
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, nsHandlerInfo);
LOG.error("The config key: {} is incorrect! The value is {}.", key, nsHandlerInfo);
continue;
}
nsAsyncHandlerCount.put(nsHandlerItems[0], Integer.parseInt(nsHandlerItems[1]));
}
}

private void initAsyncHandlerThreadPools4Ns(String nsId, int dedicatedHandlers) {
asyncRouterHandlerExecutors.computeIfAbsent(nsId, id -> Executors.newFixedThreadPool(
dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " + id + " #")));
return nsAsyncHandlerCount;
}

/**
Expand Down Expand Up @@ -2489,14 +2559,6 @@ public boolean isAsync() {
return this.enableAsync;
}

public Map<String, ExecutorService> getAsyncRouterHandlerExecutors() {
return asyncRouterHandlerExecutors;
}

public ExecutorService getRouterAsyncHandlerDefaultExecutor() {
return routerDefaultAsyncHandlerExecutor;
}

private static class AsyncThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
Expand Down
Loading
Loading