1818package org .apache .hadoop .hdfs .server .federation .router ;
1919
2020import static org .apache .hadoop .fs .CommonConfigurationKeysPublic .HADOOP_SECURITY_AUTHORIZATION ;
21+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT ;
22+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT ;
23+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_ENABLE_KEY ;
24+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT ;
25+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY ;
26+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT ;
27+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY ;
28+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY ;
29+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION ;
30+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT ;
2131import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HANDLER_COUNT_DEFAULT ;
2232import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HANDLER_COUNT_KEY ;
2333import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT ;
2636import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_READER_COUNT_KEY ;
2737import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT ;
2838import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_READER_QUEUE_SIZE_KEY ;
29- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT ;
30- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT ;
31- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT ;
32- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT ;
33- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ENABLE_ASYNC ;
34- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT ;
3539import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DN_REPORT_CACHE_EXPIRE ;
3640import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DN_REPORT_CACHE_EXPIRE_MS_DEFAULT ;
37- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION ;
38- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT ;
3941import static org .apache .hadoop .hdfs .server .federation .router .RouterFederationRename .RouterRenameOption ;
4042import static org .apache .hadoop .hdfs .server .federation .router .async .utils .AsyncUtil .asyncApply ;
4143import static org .apache .hadoop .hdfs .server .federation .router .async .utils .AsyncUtil .asyncCatch ;
5658import java .util .ArrayList ;
5759import java .util .Collection ;
5860import java .util .EnumSet ;
61+ import java .util .HashSet ;
5962import java .util .Iterator ;
6063import java .util .LinkedHashMap ;
6164import java .util .LinkedHashSet ;
6265import java .util .List ;
6366import java .util .Map ;
6467import java .util .Map .Entry ;
6568import java .util .Set ;
69+ import java .util .concurrent .ConcurrentHashMap ;
6670import java .util .concurrent .ExecutionException ;
67- import java .util .concurrent .Executor ;
6871import java .util .concurrent .ExecutorService ;
6972import java .util .concurrent .Executors ;
7073import java .util .concurrent .ThreadFactory ;
7174import java .util .concurrent .TimeUnit ;
7275import java .util .concurrent .atomic .AtomicInteger ;
7376import java .util .stream .Collectors ;
7477
78+ import org .apache .commons .lang3 .StringUtils ;
7579import org .apache .hadoop .fs .Path ;
7680import org .apache .hadoop .hdfs .HAUtil ;
7781import org .apache .hadoop .hdfs .protocol .UnresolvedPathException ;
209213import org .apache .hadoop .tools .protocolPB .GetUserMappingsProtocolPB ;
210214import org .apache .hadoop .tools .protocolPB .GetUserMappingsProtocolServerSideTranslatorPB ;
211215import org .apache .hadoop .util .ReflectionUtils ;
216+ import org .checkerframework .checker .nullness .qual .NonNull ;
212217import org .slf4j .Logger ;
213218import org .slf4j .LoggerFactory ;
214219
@@ -228,8 +233,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
228233
229234 private static final Logger LOG =
230235 LoggerFactory .getLogger (RouterRpcServer .class );
231- private ExecutorService asyncRouterHandler ;
232- private ExecutorService asyncRouterResponder ;
236+
237+ /** Name service keyword to identify fan-out calls. */
238+ public static final String CONCURRENT_NS = "concurrent" ;
233239
234240 /** Configuration for the RPC server. */
235241 private Configuration conf ;
@@ -287,6 +293,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
287293 /** Schedule the router federation rename jobs. */
288294 private BalanceProcedureScheduler fedRenameScheduler ;
289295 private boolean enableAsync ;
296+ private Map <String , Integer > nsAsyncHandlerCount = new ConcurrentHashMap <>();
297+ private Map <String , ExecutorService > asyncRouterHandlerExecutors = new ConcurrentHashMap <>();
298+ private ExecutorService routerAsyncResponderExecutor ;
299+ private ExecutorService routerDefaultAsyncHandlerExecutor ;
290300
291301 /**
292302 * Construct a router RPC server.
@@ -318,11 +328,11 @@ public RouterRpcServer(Configuration conf, Router router,
318328 int handlerQueueSize = this .conf .getInt (DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY ,
319329 DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT );
320330
321- this .enableAsync = conf .getBoolean (DFS_ROUTER_RPC_ENABLE_ASYNC ,
322- DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT );
323- LOG .info ("Router enable async {}" , this .enableAsync );
331+ this .enableAsync = conf .getBoolean (DFS_ROUTER_ASYNC_RPC_ENABLE_KEY ,
332+ DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT );
333+ LOG .info ("Router enable async rpc: {}" , this .enableAsync );
324334 if (this .enableAsync ) {
325- initAsyncThreadPool ( );
335+ initAsyncThreadPools ( conf );
326336 }
327337 // Override Hadoop Common IPC setting
328338 int readerQueueSize = this .conf .getInt (DFS_ROUTER_READER_QUEUE_SIZE_KEY ,
@@ -446,8 +456,7 @@ public RouterRpcServer(Configuration conf, Router router,
446456 // Create the client
447457 if (this .enableAsync ) {
448458 this .rpcClient = new RouterAsyncRpcClient (this .conf , this .router ,
449- this .namenodeResolver , this .rpcMonitor ,
450- routerStateIdContext , asyncRouterHandler );
459+ this .namenodeResolver , this .rpcMonitor , routerStateIdContext );
451460 this .clientProto = new RouterAsyncClientProtocol (conf , this );
452461 this .nnProto = new RouterAsyncNamenodeProtocol (this );
453462 this .routerProto = new RouterAsyncUserProtocol (this );
@@ -491,23 +500,77 @@ public RouterRpcServer(Configuration conf, Router router,
491500
492501 /**
493502 * Init router async handlers and router async responders.
503+ * @param configuration the configuration.
494504 */
495- public void initAsyncThreadPool () {
496- int asyncHandlerCount = conf .getInt (DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT ,
497- DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT );
498- int asyncResponderCount = conf .getInt (DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT ,
499- DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT );
500- if (asyncRouterHandler == null ) {
501- LOG .info ("init router async handler count: {}" , asyncHandlerCount );
502- asyncRouterHandler = Executors .newFixedThreadPool (
503- asyncHandlerCount , new AsyncThreadFactory ("router async handler " ));
505+ public void initAsyncThreadPools (Configuration configuration ) {
506+ LOG .info ("Begin initialize asynchronous handler and responder thread pool." );
507+ initNsAsyncHandlerCount ();
508+ Set <String > allConfiguredNS = FederationUtil .getAllConfiguredNS (configuration );
509+ Set <String > unassignedNS = new HashSet <>();
510+ allConfiguredNS .add (CONCURRENT_NS );
511+
512+ for (String nsId : allConfiguredNS ) {
513+ int dedicatedHandlers = nsAsyncHandlerCount .getOrDefault (nsId , 0 );
514+ LOG .info ("Dedicated handlers {} for ns {} " , dedicatedHandlers , nsId );
515+ if (dedicatedHandlers > 0 ) {
516+ initAsyncHandlerThreadPools4Ns (nsId , dedicatedHandlers );
517+ LOG .info ("Assigned {} async handlers to nsId {} " , dedicatedHandlers , nsId );
518+ } else {
519+ unassignedNS .add (nsId );
520+ }
521+ }
522+
523+ int asyncHandlerCountDefault = configuration .getInt (DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY ,
524+ DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT );
525+
526+ if (!unassignedNS .isEmpty ()) {
527+ LOG .warn ("Async handler unassigned ns: {}" , unassignedNS );
528+ LOG .info ("Use default async handler count {} for unassigned ns." , asyncHandlerCountDefault );
529+ for (String nsId : unassignedNS ) {
530+ initAsyncHandlerThreadPools4Ns (nsId , asyncHandlerCountDefault );
531+ }
504532 }
505- if (asyncRouterResponder == null ) {
506- LOG .info ("init router async responder count: {}" , asyncResponderCount );
507- asyncRouterResponder = Executors .newFixedThreadPool (
508- asyncResponderCount , new AsyncThreadFactory ("router async responder " ));
533+
534+ int asyncResponderCount = configuration .getInt (DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY ,
535+ DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT );
536+ if (routerAsyncResponderExecutor == null ) {
537+ LOG .info ("Initialize router async responder count: {}" , asyncResponderCount );
538+ routerAsyncResponderExecutor = Executors .newFixedThreadPool (
539+ asyncResponderCount , new AsyncThreadFactory ("Router Async Responder #" ));
540+ }
541+ AsyncRpcProtocolPBUtil .setAsyncResponderExecutor (routerAsyncResponderExecutor );
542+
543+ if (routerDefaultAsyncHandlerExecutor == null ) {
544+ LOG .info ("init router async default executor handler count: {}" , asyncHandlerCountDefault );
545+ routerDefaultAsyncHandlerExecutor = Executors .newFixedThreadPool (
546+ asyncHandlerCountDefault , new AsyncThreadFactory ("Router Async Default Handler #" ));
547+ }
548+ }
549+
550+ private void initNsAsyncHandlerCount () {
551+ String configNsHandler = conf .get (DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY ,
552+ DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT );
553+ if (StringUtils .isEmpty (configNsHandler )) {
554+ LOG .error (
555+ "The value of config key: {} is empty. Will use default conf." ,
556+ DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY );
509557 }
510- AsyncRpcProtocolPBUtil .setWorker (asyncRouterResponder );
558+ String [] nsHandlers = configNsHandler .split ("," );
559+ for (String nsHandlerInfo : nsHandlers ) {
560+ String [] nsHandlerItems = nsHandlerInfo .split (":" );
561+ if (nsHandlerItems .length != 2 || StringUtils .isBlank (nsHandlerItems [0 ]) ||
562+ !StringUtils .isNumeric (nsHandlerItems [1 ])) {
563+ LOG .error ("The config key: {} is incorrect! The value is {}." ,
564+ DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY , nsHandlerInfo );
565+ continue ;
566+ }
567+ nsAsyncHandlerCount .put (nsHandlerItems [0 ], Integer .parseInt (nsHandlerItems [1 ]));
568+ }
569+ }
570+
571+ private void initAsyncHandlerThreadPools4Ns (String nsId , int dedicatedHandlers ) {
572+ asyncRouterHandlerExecutors .computeIfAbsent (nsId , id -> Executors .newFixedThreadPool (
573+ dedicatedHandlers , new AsyncThreadFactory ("Router Async Handler for " + id + " #" )));
511574 }
512575
513576 /**
@@ -2426,8 +2489,12 @@ public boolean isAsync() {
24262489 return this .enableAsync ;
24272490 }
24282491
2429- public Executor getAsyncRouterHandler () {
2430- return asyncRouterHandler ;
2492+ public Map <String , ExecutorService > getAsyncRouterHandlerExecutors () {
2493+ return asyncRouterHandlerExecutors ;
2494+ }
2495+
2496+ public ExecutorService getRouterAsyncHandlerDefaultExecutor () {
2497+ return routerDefaultAsyncHandlerExecutor ;
24312498 }
24322499
24332500 private static class AsyncThreadFactory implements ThreadFactory {
@@ -2439,8 +2506,10 @@ private static class AsyncThreadFactory implements ThreadFactory {
24392506 }
24402507
24412508 @ Override
2442- public Thread newThread (Runnable r ) {
2443- return new Thread (r , namePrefix + threadNumber .getAndIncrement ());
2509+ public Thread newThread (@ NonNull Runnable r ) {
2510+ Thread thread = new Thread (r , namePrefix + threadNumber .getAndIncrement ());
2511+ thread .setDaemon (true );
2512+ return thread ;
24442513 }
24452514 }
24462515}
0 commit comments