|
32 | 32 | import java.util.Map; |
33 | 33 | import java.util.Optional; |
34 | 34 | import java.util.Set; |
35 | | -import java.util.concurrent.CompletableFuture; |
36 | 35 | import java.util.concurrent.ConcurrentHashMap; |
37 | 36 | import java.util.concurrent.ScheduledExecutorService; |
38 | 37 | import java.util.concurrent.TimeUnit; |
@@ -404,31 +403,22 @@ private void processBatchesInParallel(List<List<Runnable>> batches, int maxConcu |
404 | 403 | if (batches.isEmpty()) { |
405 | 404 | return; |
406 | 405 | } |
407 | | - List<CompletableFuture<Void>> inFlight = new ArrayList<>(); |
408 | | - for (List<Runnable> batch : batches) { |
409 | | - CompletableFuture<Void> fut = |
410 | | - CompletableFuture.runAsync( |
411 | | - () -> |
412 | | - batch.parallelStream() |
413 | | - .forEach( |
414 | | - r -> { |
415 | | - try { |
416 | | - r.run(); |
417 | | - } catch (Throwable t) { |
418 | | - LOG.log( |
419 | | - getDebugLogLevel(), "Health check execution failed in batch", t); |
420 | | - } |
421 | | - }), |
422 | | - nodeHealthCheckService); |
423 | | - inFlight.add(fut); |
424 | | - if (inFlight.size() >= maxConcurrentBatches) { |
425 | | - CompletableFuture.allOf(inFlight.toArray(new CompletableFuture[0])).join(); |
426 | | - inFlight.clear(); |
427 | | - } |
428 | | - } |
429 | | - if (!inFlight.isEmpty()) { |
430 | | - CompletableFuture.allOf(inFlight.toArray(new CompletableFuture[0])).join(); |
431 | | - } |
| 406 | + |
| 407 | + // Process batches with controlled parallelism |
| 408 | + batches.parallelStream() |
| 409 | + .limit(maxConcurrentBatches) |
| 410 | + .forEach( |
| 411 | + batch -> |
| 412 | + batch.parallelStream() |
| 413 | + .forEach( |
| 414 | + r -> { |
| 415 | + try { |
| 416 | + r.run(); |
| 417 | + } catch (Throwable t) { |
| 418 | + LOG.log( |
| 419 | + getDebugLogLevel(), "Health check execution failed in batch", t); |
| 420 | + } |
| 421 | + })); |
432 | 422 | } |
433 | 423 |
|
434 | 424 | private static List<List<Runnable>> partition(List<Runnable> list, int size) { |
|
0 commit comments