@@ -324,34 +324,10 @@ public boolean progress() {
324
324
}
325
325
326
326
/**
327
- * This function calculates how many splitters this RS should create based on expected average
328
- * tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
329
- * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
330
- * @param numTasks total number of split tasks available
331
- * @return number of tasks this RS can grab
332
- */
333
- private int getNumExpectedTasksPerRS (int numTasks ) {
334
- // at lease one RS(itself) available
335
- int availableRSs = 1 ;
336
- try {
337
- List <String > regionServers =
338
- ZKUtil .listChildrenNoWatch (watcher , watcher .getZNodePaths ().rsZNode );
339
- availableRSs = Math .max (availableRSs , (regionServers == null ) ? 0 : regionServers .size ());
340
- } catch (KeeperException e ) {
341
- // do nothing
342
- LOG .debug ("getAvailableRegionServers got ZooKeeper exception" , e );
343
- }
344
- int expectedTasksPerRS = (numTasks / availableRSs ) + ((numTasks % availableRSs == 0 ) ? 0 : 1 );
345
- return Math .max (1 , expectedTasksPerRS ); // at least be one
346
- }
347
-
348
- /**
349
- * @param expectedTasksPerRS Average number of tasks to be handled by each RS
350
327
* @return true if more splitters are available, otherwise false.
351
328
*/
352
- private boolean areSplittersAvailable (int expectedTasksPerRS ) {
353
- return (Math .min (expectedTasksPerRS , maxConcurrentTasks )
354
- - this .tasksInProgress .get ()) > 0 ;
329
+ private boolean areSplittersAvailable () {
330
+ return maxConcurrentTasks - tasksInProgress .get () > 0 ;
355
331
}
356
332
357
333
/**
@@ -432,22 +408,25 @@ public void taskLoop() throws InterruptedException {
432
408
}
433
409
}
434
410
int numTasks = paths .size ();
435
- int expectedTasksPerRS = getNumExpectedTasksPerRS (numTasks );
436
411
boolean taskGrabbed = false ;
437
412
for (int i = 0 ; i < numTasks ; i ++) {
438
413
while (!shouldStop ) {
439
- if (this .areSplittersAvailable (expectedTasksPerRS )) {
440
- LOG .debug ("Current region server " + server .getServerName ()
414
+ if (this .areSplittersAvailable ()) {
415
+ if (LOG .isTraceEnabled ()) {
416
+ LOG .trace ("Current region server " + server .getServerName ()
441
417
+ " is ready to take more tasks, will get task list and try grab tasks again." );
418
+ }
442
419
int idx = (i + offset ) % paths .size ();
443
420
// don't call ZKSplitLog.getNodeName() because that will lead to
444
421
// double encoding of the path name
445
422
taskGrabbed |= grabTask (ZNodePaths .joinZNode (
446
423
watcher .getZNodePaths ().splitLogZNode , paths .get (idx )));
447
424
break ;
448
425
} else {
449
- LOG .debug ("Current region server " + server .getServerName () + " has "
426
+ if (LOG .isTraceEnabled ()) {
427
+ LOG .trace ("Current region server " + server .getServerName () + " has "
450
428
+ this .tasksInProgress .get () + " tasks in progress and can't take more." );
429
+ }
451
430
Thread .sleep (100 );
452
431
}
453
432
}
0 commit comments