@@ -179,8 +179,14 @@ public Object invokeMethod(
179179 namenodes .toString (), params );
180180 }
181181 threadLocalContext .transfer ();
182- invokeMethodAsync (nsid , ugi , (List <FederationNamenodeContext >) namenodes ,
182+ RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController ();
183+ acquirePermit (nsid , ugi , method .getName (), controller );
184+ invokeMethodAsync (ugi , (List <FederationNamenodeContext >) namenodes ,
183185 useObserver , protocol , method , params );
186+ asyncFinally (object -> {
187+ releasePermit (nsid , ugi , method , controller );
188+ return object ;
189+ });
184190 }, router .getRpcServer ().getAsyncRouterHandlerExecutors ().getOrDefault (nsid ,
185191 router .getRpcServer ().getRouterAsyncHandlerDefaultExecutor ()));
186192 return null ;
@@ -203,13 +209,11 @@ public Object invokeMethod(
203209 * @param params The parameters for the method invocation.
204210 */
205211 private void invokeMethodAsync (
206- String nsid ,
207212 final UserGroupInformation ugi ,
208213 final List <FederationNamenodeContext > namenodes ,
209214 boolean useObserver ,
210215 final Class <?> protocol , final Method method , final Object ... params ) {
211216
212- RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController ();
213217 addClientInfoToCallerContext (ugi );
214218 if (rpcMonitor != null ) {
215219 rpcMonitor .proxyOp ();
@@ -218,7 +222,6 @@ private void invokeMethodAsync(
218222 Map <FederationNamenodeContext , IOException > ioes = new LinkedHashMap <>();
219223 final ConnectionContext [] connection = new ConnectionContext [1 ];
220224 asyncTry (() -> {
221- acquirePermit (nsid , ugi , method , controller );
222225 asyncForEach (namenodes .iterator (),
223226 (foreach , namenode ) -> {
224227 if (!status .isShouldUseObserver ()
@@ -260,12 +263,6 @@ private void invokeMethodAsync(
260263 return handlerAllNamenodeFail (namenodes , method , ioes , params );
261264 });
262265 });
263-
264- asyncFinally (res -> {
265- releasePermit (nsid , ugi , method , controller );
266- return res ;
267- });
268-
269266 }
270267
271268 /**
@@ -429,9 +426,6 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
429426 }
430427 return ret ;
431428 }, Exception .class );
432- asyncFinally (ret -> {
433- return ret ;
434- });
435429 });
436430 asyncApply (result -> {
437431 if (status .isComplete ()) {
@@ -488,76 +482,6 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
488482 return asyncReturn (Map .class );
489483 }
490484
491- @ SuppressWarnings ("unchecked" )
492- public <T extends RemoteLocationContext , R > List <RemoteResult <T , R >> invokeConcurrent (
493- final Collection <T > locations , final RemoteMethod method ,
494- boolean standby , long timeOutMs ,
495- Class <R > clazz ) throws IOException {
496-
497- final UserGroupInformation ugi = RouterRpcServer .getRemoteUser ();
498- final Method m = method .getMethod ();
499-
500- if (locations .isEmpty ()) {
501- throw new IOException ("No remote locations available" );
502- } else if (locations .size () == 1 && timeOutMs <= 0 ) {
503- // Shortcut, just one call
504- return invokeSingle (locations .iterator ().next (), method );
505- }
506- // Don't acquire CONCURRENT_NS permit here.
507- RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController ();
508-
509- List <T > orderedLocations = new ArrayList <>();
510- List <Callable <Object >> callables = new ArrayList <>();
511- // transfer originCall & callerContext to worker threads of executor.
512- final Server .Call originCall = Server .getCurCall ().get ();
513- final CallerContext originContext = CallerContext .getCurrent ();
514- for (final T location : locations ) {
515- String nsId = location .getNameserviceId ();
516- boolean isObserverRead = isObserverReadEligible (nsId , m );
517- final List <? extends FederationNamenodeContext > namenodes =
518- getOrderedNamenodes (nsId , isObserverRead );
519- final Class <?> proto = method .getProtocol ();
520- final Object [] paramList = method .getParams (location );
521- if (standby ) {
522- // Call the objectGetter to all NNs (including standby)
523- for (final FederationNamenodeContext nn : namenodes ) {
524- String nnId = nn .getNamenodeId ();
525- final List <FederationNamenodeContext > nnList =
526- Collections .singletonList (nn );
527- T nnLocation = location ;
528- if (location instanceof RemoteLocation ) {
529- nnLocation = (T )new RemoteLocation (nsId , nnId , location .getDest ());
530- }
531- orderedLocations .add (nnLocation );
532- callables .add (
533- () -> {
534- transferThreadLocalContext (originCall , originContext );
535- return invokeMethod (
536- ugi , nnList , isObserverRead , proto , m , paramList );
537- });
538- }
539- } else {
540- // Call the objectGetter in order of nameservices in the NS list
541- orderedLocations .add (location );
542- callables .add (
543- () -> {
544- transferThreadLocalContext (originCall , originContext );
545- return invokeMethod (
546- ugi , namenodes , isObserverRead , proto , m , paramList );
547- });
548- }
549- }
550-
551- if (rpcMonitor != null ) {
552- rpcMonitor .proxyOp ();
553- }
554- if (this .router .getRouterClientMetrics () != null ) {
555- this .router .getRouterClientMetrics ().incInvokedConcurrent (m );
556- }
557-
558- return getRemoteResults (method , timeOutMs , controller , orderedLocations , callables );
559- }
560-
561485 /**
562486 * Invokes multiple concurrent proxy calls to different clients. Returns an
563487 * array of results.
@@ -641,9 +565,6 @@ public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingl
641565 asyncCatch ((o , ioe ) -> {
642566 throw processException (ioe , location );
643567 }, IOException .class );
644- asyncFinally (o -> {
645- return o ;
646- });
647568 return asyncReturn (List .class );
648569 }
649570
@@ -662,18 +583,13 @@ public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>> invokeSingl
662583 public Object invokeSingle (final String nsId , RemoteMethod method )
663584 throws IOException {
664585 UserGroupInformation ugi = RouterRpcServer .getRemoteUser ();
665- asyncTry (() -> {
666- boolean isObserverRead = isObserverReadEligible (nsId , method .getMethod ());
667- List <? extends FederationNamenodeContext > nns = getOrderedNamenodes (nsId , isObserverRead );
668- RemoteLocationContext loc = new RemoteLocation (nsId , "/" , "/" );
669- Class <?> proto = method .getProtocol ();
670- Method m = method .getMethod ();
671- Object [] params = method .getParams (loc );
672- invokeMethod (ugi , nns , isObserverRead , proto , m , params );
673- });
674- asyncFinally (o -> {
675- return o ;
676- });
586+ boolean isObserverRead = isObserverReadEligible (nsId , method .getMethod ());
587+ List <? extends FederationNamenodeContext > nns = getOrderedNamenodes (nsId , isObserverRead );
588+ RemoteLocationContext loc = new RemoteLocation (nsId , "/" , "/" );
589+ Class <?> proto = method .getProtocol ();
590+ Method m = method .getMethod ();
591+ Object [] params = method .getParams (loc );
592+ invokeMethod (ugi , nns , isObserverRead , proto , m , params );
677593 return null ;
678594 }
679595
@@ -698,31 +614,6 @@ public <T> T invokeSingle(
698614 return asyncReturn (clazz );
699615 }
700616
701- protected void acquirePermit (final String nsId , final UserGroupInformation ugi ,
702- final Method m , RouterRpcFairnessPolicyController controller )
703- throws IOException {
704- if (controller != null ) {
705- if (!controller .acquirePermit (nsId )) {
706- // Throw StandByException,
707- // Clients could fail over and try another router.
708- if (rpcMonitor != null ) {
709- rpcMonitor .proxyOpPermitRejected (nsId );
710- }
711- incrRejectedPermitForNs (nsId );
712- LOG .debug ("Permit denied for ugi: {} for method: {}" ,
713- ugi , m .getName ());
714- String msg =
715- "Router " + router .getRouterId () +
716- " is overloaded for NS: " + nsId ;
717- throw new StandbyException (msg );
718- }
719- if (rpcMonitor != null ) {
720- rpcMonitor .proxyOpPermitAccepted (nsId );
721- }
722- incrAcceptedPermitForNs (nsId );
723- }
724- }
725-
726617 /**
727618 * Release permit for specific nsId after processing against downstream
728619 * nsId is completed.
0 commit comments