4949import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .BYTES_PER_GIGABYTE ;
5050import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR ;
5151import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .HIGH_CPU_REDUCTION_FACTOR ;
52- import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .HUNDRED ;
5352import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .HUNDRED_D ;
5453import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR ;
5554import static org .apache .hadoop .fs .azurebfs .constants .FileSystemConfigurations .LOW_CPU_POOL_SIZE_INCREASE_FACTOR ;
@@ -88,17 +87,15 @@ public final class WriteThreadPoolSizeManager implements Closeable {
8887 private final AbfsConfiguration abfsConfiguration ;
8988 /* Metrics collector for monitoring the performance of the ABFS write thread pool. */
9089 private final AbfsWriteThreadPoolMetrics writeThreadPoolMetrics ;
91- /* Last recorded CPU time used for computing CPU utilization deltas. */
92- private static long lastCpuTime = 0 ;
93- /* Last recorded system time used for utilization calculations. */
94- private static long lastTime = 0 ;
9590 /* Flag indicating if CPU monitoring has started. */
9691 private volatile boolean isMonitoringStarted = false ;
9792 /* Tracks the last scale direction applied, or empty if none. */
9893 private volatile String lastScaleDirection = EMPTY_STRING ;
9994 /* Maximum CPU utilization observed during the monitoring interval. */
10095 private volatile double maxCpuUtilization = 0.0 ;
96+ /** High memory usage threshold used to trigger thread pool downscaling. */
10197 private final double highMemoryThreshold ;
98+ /** Low memory usage threshold used to allow thread pool upscaling. */
10299 private final double lowMemoryThreshold ;
103100
104101 /**
@@ -351,14 +348,15 @@ public double getJvmCpuLoad() {
351348 double getMemoryLoad () {
352349 MemoryMXBean osBean = ManagementFactory .getMemoryMXBean ();
353350 MemoryUsage memoryUsage = osBean .getHeapMemoryUsage ();
354- return (double ) memoryUsage .getUsed () / memoryUsage .getCommitted ();
351+ return (double ) memoryUsage .getUsed () / memoryUsage .getMax ();
355352 }
356353
357354 /**
358355 * Dynamically adjusts the thread pool size based on current CPU utilization
359356 * and available heap memory relative to the initially available heap.
360357 *
361358 * @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
359+ * @throws InterruptedException if the resizing operation is interrupted while acquiring the lock
362360 */
363361 public void adjustThreadPoolSizeBasedOnCPU (double cpuUtilization ) throws InterruptedException {
364362 lock .lock ();
@@ -367,7 +365,6 @@ public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws Interru
367365 int currentPoolSize = executor .getMaximumPoolSize ();
368366 double memoryLoad = getMemoryLoad ();
369367 LOG .debug ("Current CPU Utilization: {}" , cpuUtilization );
370-
371368 if (cpuUtilization > (abfsConfiguration .getWriteHighCpuThreshold ()/HUNDRED_D )) {
372369 newMaxPoolSize = calculateReducedPoolSizeHighCPU (currentPoolSize , memoryLoad );
373370 } else if (cpuUtilization > (abfsConfiguration .getWriteMediumCpuThreshold ()/HUNDRED_D )) {
@@ -737,7 +734,7 @@ synchronized WriteThreadPoolStats getCurrentStats(
737734 getSystemCpuUtilization (), // System CPU usage (ratio)
738735 getAvailableHeapMemory (), // Free heap (GB)
739736 getCommittedHeapMemory (), // Committed heap (GB)
740- memoryLoad , // used/committed
737+ memoryLoad , // used/max
741738 currentScaleDirection , // "I", "D", or ""
742739 maxCpuUtilization // Peak JVM CPU usage so far
743740 );
0 commit comments