18
18
package org .apache .hadoop .hdfs .server .federation .router ;
19
19
20
20
import static org .apache .hadoop .fs .CommonConfigurationKeysPublic .HADOOP_SECURITY_AUTHORIZATION ;
21
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT ;
22
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT ;
23
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_ENABLE_KEY ;
24
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT ;
25
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY ;
26
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT ;
27
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY ;
28
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY ;
29
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION ;
30
+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT ;
21
31
import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HANDLER_COUNT_DEFAULT ;
22
32
import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HANDLER_COUNT_KEY ;
23
33
import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT ;
26
36
import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_READER_COUNT_KEY ;
27
37
import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT ;
28
38
import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_READER_QUEUE_SIZE_KEY ;
29
- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT ;
30
- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT ;
31
- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT ;
32
- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT ;
33
- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ENABLE_ASYNC ;
34
- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT ;
35
39
import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DN_REPORT_CACHE_EXPIRE ;
36
40
import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DN_REPORT_CACHE_EXPIRE_MS_DEFAULT ;
37
- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION ;
38
- import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT ;
39
41
import static org .apache .hadoop .hdfs .server .federation .router .RouterFederationRename .RouterRenameOption ;
40
42
import static org .apache .hadoop .hdfs .server .federation .router .async .utils .AsyncUtil .asyncApply ;
41
43
import static org .apache .hadoop .hdfs .server .federation .router .async .utils .AsyncUtil .asyncCatch ;
56
58
import java .util .ArrayList ;
57
59
import java .util .Collection ;
58
60
import java .util .EnumSet ;
61
+ import java .util .HashSet ;
59
62
import java .util .Iterator ;
60
63
import java .util .LinkedHashMap ;
61
64
import java .util .LinkedHashSet ;
62
65
import java .util .List ;
63
66
import java .util .Map ;
64
67
import java .util .Map .Entry ;
65
68
import java .util .Set ;
69
+ import java .util .concurrent .ConcurrentHashMap ;
66
70
import java .util .concurrent .ExecutionException ;
67
- import java .util .concurrent .Executor ;
68
71
import java .util .concurrent .ExecutorService ;
69
72
import java .util .concurrent .Executors ;
70
73
import java .util .concurrent .ThreadFactory ;
71
74
import java .util .concurrent .TimeUnit ;
72
75
import java .util .concurrent .atomic .AtomicInteger ;
73
76
import java .util .stream .Collectors ;
74
77
78
+ import org .apache .commons .lang3 .StringUtils ;
75
79
import org .apache .hadoop .fs .Path ;
76
80
import org .apache .hadoop .hdfs .HAUtil ;
77
81
import org .apache .hadoop .hdfs .protocol .UnresolvedPathException ;
209
213
import org .apache .hadoop .tools .protocolPB .GetUserMappingsProtocolPB ;
210
214
import org .apache .hadoop .tools .protocolPB .GetUserMappingsProtocolServerSideTranslatorPB ;
211
215
import org .apache .hadoop .util .ReflectionUtils ;
216
+ import org .checkerframework .checker .nullness .qual .NonNull ;
212
217
import org .slf4j .Logger ;
213
218
import org .slf4j .LoggerFactory ;
214
219
@@ -228,8 +233,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
228
233
229
234
private static final Logger LOG =
230
235
LoggerFactory .getLogger (RouterRpcServer .class );
231
- private ExecutorService asyncRouterHandler ;
232
- private ExecutorService asyncRouterResponder ;
236
+
237
+ /** Name service keyword to identify fan-out calls. */
238
+ public static final String CONCURRENT_NS = "concurrent" ;
233
239
234
240
/** Configuration for the RPC server. */
235
241
private Configuration conf ;
@@ -287,6 +293,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
287
293
/** Schedule the router federation rename jobs. */
288
294
private BalanceProcedureScheduler fedRenameScheduler ;
289
295
private boolean enableAsync ;
296
+ private Map <String , Integer > nsAsyncHandlerCount = new ConcurrentHashMap <>();
297
+ private Map <String , ExecutorService > asyncRouterHandlerExecutors = new ConcurrentHashMap <>();
298
+ private ExecutorService routerAsyncResponderExecutor ;
299
+ private ExecutorService routerDefaultAsyncHandlerExecutor ;
290
300
291
301
/**
292
302
* Construct a router RPC server.
@@ -318,11 +328,11 @@ public RouterRpcServer(Configuration conf, Router router,
318
328
int handlerQueueSize = this .conf .getInt (DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY ,
319
329
DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT );
320
330
321
- this .enableAsync = conf .getBoolean (DFS_ROUTER_RPC_ENABLE_ASYNC ,
322
- DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT );
323
- LOG .info ("Router enable async {}" , this .enableAsync );
331
+ this .enableAsync = conf .getBoolean (DFS_ROUTER_ASYNC_RPC_ENABLE_KEY ,
332
+ DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT );
333
+ LOG .info ("Router enable async rpc: {}" , this .enableAsync );
324
334
if (this .enableAsync ) {
325
- initAsyncThreadPool ( );
335
+ initAsyncThreadPools ( conf );
326
336
}
327
337
// Override Hadoop Common IPC setting
328
338
int readerQueueSize = this .conf .getInt (DFS_ROUTER_READER_QUEUE_SIZE_KEY ,
@@ -446,8 +456,7 @@ public RouterRpcServer(Configuration conf, Router router,
446
456
// Create the client
447
457
if (this .enableAsync ) {
448
458
this .rpcClient = new RouterAsyncRpcClient (this .conf , this .router ,
449
- this .namenodeResolver , this .rpcMonitor ,
450
- routerStateIdContext , asyncRouterHandler );
459
+ this .namenodeResolver , this .rpcMonitor , routerStateIdContext );
451
460
this .clientProto = new RouterAsyncClientProtocol (conf , this );
452
461
this .nnProto = new RouterAsyncNamenodeProtocol (this );
453
462
this .routerProto = new RouterAsyncUserProtocol (this );
@@ -491,23 +500,77 @@ public RouterRpcServer(Configuration conf, Router router,
491
500
492
501
/**
493
502
* Init router async handlers and router async responders.
503
+ * @param configuration the configuration.
494
504
*/
495
- public void initAsyncThreadPool () {
496
- int asyncHandlerCount = conf .getInt (DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT ,
497
- DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT );
498
- int asyncResponderCount = conf .getInt (DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT ,
499
- DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT );
500
- if (asyncRouterHandler == null ) {
501
- LOG .info ("init router async handler count: {}" , asyncHandlerCount );
502
- asyncRouterHandler = Executors .newFixedThreadPool (
503
- asyncHandlerCount , new AsyncThreadFactory ("router async handler " ));
505
+ public void initAsyncThreadPools (Configuration configuration ) {
506
+ LOG .info ("Begin initialize asynchronous handler and responder thread pool." );
507
+ initNsAsyncHandlerCount ();
508
+ Set <String > allConfiguredNS = FederationUtil .getAllConfiguredNS (configuration );
509
+ Set <String > unassignedNS = new HashSet <>();
510
+ allConfiguredNS .add (CONCURRENT_NS );
511
+
512
+ for (String nsId : allConfiguredNS ) {
513
+ int dedicatedHandlers = nsAsyncHandlerCount .getOrDefault (nsId , 0 );
514
+ LOG .info ("Dedicated handlers {} for ns {} " , dedicatedHandlers , nsId );
515
+ if (dedicatedHandlers > 0 ) {
516
+ initAsyncHandlerThreadPools4Ns (nsId , dedicatedHandlers );
517
+ LOG .info ("Assigned {} async handlers to nsId {} " , dedicatedHandlers , nsId );
518
+ } else {
519
+ unassignedNS .add (nsId );
520
+ }
521
+ }
522
+
523
+ int asyncHandlerCountDefault = configuration .getInt (DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY ,
524
+ DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT );
525
+
526
+ if (!unassignedNS .isEmpty ()) {
527
+ LOG .warn ("Async handler unassigned ns: {}" , unassignedNS );
528
+ LOG .info ("Use default async handler count {} for unassigned ns." , asyncHandlerCountDefault );
529
+ for (String nsId : unassignedNS ) {
530
+ initAsyncHandlerThreadPools4Ns (nsId , asyncHandlerCountDefault );
531
+ }
504
532
}
505
- if (asyncRouterResponder == null ) {
506
- LOG .info ("init router async responder count: {}" , asyncResponderCount );
507
- asyncRouterResponder = Executors .newFixedThreadPool (
508
- asyncResponderCount , new AsyncThreadFactory ("router async responder " ));
533
+
534
+ int asyncResponderCount = configuration .getInt (DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY ,
535
+ DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT );
536
+ if (routerAsyncResponderExecutor == null ) {
537
+ LOG .info ("Initialize router async responder count: {}" , asyncResponderCount );
538
+ routerAsyncResponderExecutor = Executors .newFixedThreadPool (
539
+ asyncResponderCount , new AsyncThreadFactory ("Router Async Responder #" ));
540
+ }
541
+ AsyncRpcProtocolPBUtil .setAsyncResponderExecutor (routerAsyncResponderExecutor );
542
+
543
+ if (routerDefaultAsyncHandlerExecutor == null ) {
544
+ LOG .info ("init router async default executor handler count: {}" , asyncHandlerCountDefault );
545
+ routerDefaultAsyncHandlerExecutor = Executors .newFixedThreadPool (
546
+ asyncHandlerCountDefault , new AsyncThreadFactory ("Router Async Default Handler #" ));
547
+ }
548
+ }
549
+
550
+ private void initNsAsyncHandlerCount () {
551
+ String configNsHandler = conf .get (DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY ,
552
+ DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT );
553
+ if (StringUtils .isEmpty (configNsHandler )) {
554
+ LOG .error (
555
+ "The value of config key: {} is empty. Will use default conf." ,
556
+ DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY );
509
557
}
510
- AsyncRpcProtocolPBUtil .setWorker (asyncRouterResponder );
558
+ String [] nsHandlers = configNsHandler .split ("," );
559
+ for (String nsHandlerInfo : nsHandlers ) {
560
+ String [] nsHandlerItems = nsHandlerInfo .split (":" );
561
+ if (nsHandlerItems .length != 2 || StringUtils .isBlank (nsHandlerItems [0 ]) ||
562
+ !StringUtils .isNumeric (nsHandlerItems [1 ])) {
563
+ LOG .error ("The config key: {} is incorrect! The value is {}." ,
564
+ DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY , nsHandlerInfo );
565
+ continue ;
566
+ }
567
+ nsAsyncHandlerCount .put (nsHandlerItems [0 ], Integer .parseInt (nsHandlerItems [1 ]));
568
+ }
569
+ }
570
+
571
+ private void initAsyncHandlerThreadPools4Ns (String nsId , int dedicatedHandlers ) {
572
+ asyncRouterHandlerExecutors .computeIfAbsent (nsId , id -> Executors .newFixedThreadPool (
573
+ dedicatedHandlers , new AsyncThreadFactory ("Router Async Handler for " + id + " #" )));
511
574
}
512
575
513
576
/**
@@ -2426,8 +2489,12 @@ public boolean isAsync() {
2426
2489
return this .enableAsync ;
2427
2490
}
2428
2491
2429
- public Executor getAsyncRouterHandler () {
2430
- return asyncRouterHandler ;
2492
+ public Map <String , ExecutorService > getAsyncRouterHandlerExecutors () {
2493
+ return asyncRouterHandlerExecutors ;
2494
+ }
2495
+
2496
+ public ExecutorService getRouterAsyncHandlerDefaultExecutor () {
2497
+ return routerDefaultAsyncHandlerExecutor ;
2431
2498
}
2432
2499
2433
2500
private static class AsyncThreadFactory implements ThreadFactory {
@@ -2439,8 +2506,10 @@ private static class AsyncThreadFactory implements ThreadFactory {
2439
2506
}
2440
2507
2441
2508
@ Override
2442
- public Thread newThread (Runnable r ) {
2443
- return new Thread (r , namePrefix + threadNumber .getAndIncrement ());
2509
+ public Thread newThread (@ NonNull Runnable r ) {
2510
+ Thread thread = new Thread (r , namePrefix + threadNumber .getAndIncrement ());
2511
+ thread .setDaemon (true );
2512
+ return thread ;
2444
2513
}
2445
2514
}
2446
2515
}
0 commit comments