Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_MAX_BACKOFF_INTERVAL)
private int maxBackoffInterval;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS,
DefaultValue = BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT)
private int blockOutputActiveBlocks;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BACKOFF_INTERVAL,
DefaultValue = DEFAULT_BACKOFF_INTERVAL)
private int backoffInterval;
Expand Down Expand Up @@ -819,6 +823,10 @@ public int getNumLeaseThreads() {
return this.numLeaseThreads;
}

public int getBlockOutputActiveBlocks() {
return blockOutputActiveBlocks;
}

public boolean getCreateRemoteFileSystemDuringInitialization() {
// we do not support creating the filesystem when AuthType is SAS
return this.createRemoteFileSystemDuringInitialization
Expand Down Expand Up @@ -1136,7 +1144,7 @@ public ExponentialRetryPolicy getOauthTokenFetchRetryPolicy() {

public int getWriteMaxConcurrentRequestCount() {
if (this.writeMaxConcurrentRequestCount < 1) {
return 4 * Runtime.getRuntime().availableProcessors();
return 6 * Runtime.getRuntime().availableProcessors();
}
return this.writeMaxConcurrentRequestCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.http.client.utils.URIBuilder;
Expand All @@ -169,8 +168,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_SUCCESS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
Expand Down Expand Up @@ -230,6 +227,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private int blockOutputActiveBlocks;
/** Bounded ThreadPool for this instance. */
private ExecutorService boundedThreadPool;
WriteThreadPoolSizeManager poolSizeManager;

/**
* FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations.
Expand Down Expand Up @@ -304,11 +302,9 @@ public AzureBlobFileSystemStore(
}
this.blockFactory = abfsStoreBuilder.blockFactory;
this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
abfsConfiguration.getWriteMaxConcurrentRequestCount(),
abfsConfiguration.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-bounded");
this.poolSizeManager = WriteThreadPoolSizeManager.getInstance();
poolSizeManager.startCPUMonitoring();
this.boundedThreadPool = poolSizeManager.getExecutorService();
}

/**
Expand Down Expand Up @@ -362,6 +358,11 @@ public void close() throws IOException {
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
try {
poolSizeManager.shutdown();
} catch (InterruptedException e) {
LOG.error("Interrupted freeing boundedThreadPool", e);
}
IOUtils.cleanupWithLogger(LOG, client);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package org.apache.hadoop.fs.azurebfs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Manages a thread pool for writing operations, adjusting the pool size based on CPU utilization.
*/
public class WriteThreadPoolSizeManager {
private static WriteThreadPoolSizeManager instance;
private final int maxThreadPoolSize;
private final ScheduledExecutorService cpuMonitorExecutor;
private final ExecutorService boundedThreadPool;
private final Lock lock = new ReentrantLock();
private volatile int newMaxPoolSize;
private static final Logger LOG = LoggerFactory.getLogger(WriteThreadPoolSizeManager.class);

/**
* Private constructor to initialize the thread pool and CPU monitor executor.
*/
private WriteThreadPoolSizeManager() {
int maxPoolSize = 4 * Runtime.getRuntime().availableProcessors();
maxThreadPoolSize = 50 * Runtime.getRuntime().availableProcessors();
boundedThreadPool = Executors.newFixedThreadPool(maxPoolSize);
((ThreadPoolExecutor) boundedThreadPool).setKeepAliveTime(60, TimeUnit.SECONDS);
((ThreadPoolExecutor) boundedThreadPool).allowCoreThreadTimeOut(true);
cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
}

/**
* Returns the singleton instance of WriteThreadPoolSizeManager.
*
* @return the singleton instance.
*/
public static synchronized WriteThreadPoolSizeManager getInstance() {
if (instance == null) {
instance = new WriteThreadPoolSizeManager();
}
return instance;
}

/**
* Adjusts the thread pool size to the specified maximum pool size.
*
* @param newMaxPoolSize the new maximum pool size.
*/
public void adjustThreadPoolSize(int newMaxPoolSize) {
synchronized (this) {
ThreadPoolExecutor threadPoolExecutor = ((ThreadPoolExecutor) boundedThreadPool);
int currentCorePoolSize = threadPoolExecutor.getCorePoolSize();
if (newMaxPoolSize >= currentCorePoolSize) {
threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
} else {
threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
}
LOG.debug("The thread pool size is: {} ", newMaxPoolSize);
LOG.debug("The pool size is: {} ", threadPoolExecutor.getPoolSize());
LOG.debug("The active thread count is: {}", threadPoolExecutor.getActiveCount());
}
}

/**
* Starts monitoring the CPU utilization and adjusts the thread pool size accordingly.
*/
public synchronized void startCPUMonitoring() {
cpuMonitorExecutor.scheduleAtFixedRate(() -> {
double cpuUtilization = getCpuUtilization();
LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
try {
adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, 60, TimeUnit.SECONDS);
}

/**
* Gets the current CPU utilization.
*
* @return the CPU utilization as a percentage (0.0 to 1.0).
*/
private double getCpuUtilization() {
OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
if (osBean instanceof com.sun.management.OperatingSystemMXBean) {
com.sun.management.OperatingSystemMXBean sunOsBean = (com.sun.management.OperatingSystemMXBean) osBean;
double cpuLoad = sunOsBean.getSystemCpuLoad();
if (cpuLoad >= 0) {
return cpuLoad;
}
}
return 0.0;
}

/**
* Adjusts the thread pool size based on the current CPU utilization.
*
* @param cpuUtilization the current CPU utilization.
* @throws InterruptedException if the thread pool adjustment is interrupted.
*/
public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws InterruptedException {
lock.lock();
int currentPoolSize = ((ThreadPoolExecutor) boundedThreadPool).getMaximumPoolSize();
try {
if (cpuUtilization > 0.75) {
newMaxPoolSize = Math.max(1, currentPoolSize - currentPoolSize / 3);
} else if (cpuUtilization > 0.50) {
newMaxPoolSize = Math.max(1, currentPoolSize - currentPoolSize / 5);
} else if (cpuUtilization < 0.25) {
newMaxPoolSize = Math.min(maxThreadPoolSize, (int) (currentPoolSize * 1.5));
} else {
newMaxPoolSize = currentPoolSize;
}
LOG.debug("Adjusting pool size from " + currentPoolSize + " to " + newMaxPoolSize);
} finally {
lock.unlock();
}
if (newMaxPoolSize != currentPoolSize) {
this.adjustThreadPoolSize(newMaxPoolSize);
}
}

/**
* Shuts down the thread pool and CPU monitor executor.
*
* @throws InterruptedException if the shutdown process is interrupted.
*/
public void shutdown() throws InterruptedException {
instance = null;
cpuMonitorExecutor.shutdown();
boundedThreadPool.shutdown();
if (!boundedThreadPool.awaitTermination(30, TimeUnit.SECONDS)) {
boundedThreadPool.shutdownNow();
}
}

/**
* Returns the executor service for the thread pool.
*
* @return the executor service.
*/
public ExecutorService getExecutorService() {
return boundedThreadPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.UUID;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -162,6 +165,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
/** Retry fallback for append on DFS */
private static boolean fallbackDFSAppend = false;

private final WriteThreadPoolSizeManager poolSizeManager;
private int currentPoolSize;

public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
throws IOException {
this.client = abfsOutputStreamContext.getClient();
Expand All @@ -184,7 +190,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
this.writeOperations = new ConcurrentLinkedDeque<>();
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
this.eTag = abfsOutputStreamContext.getETag();

this.poolSizeManager = WriteThreadPoolSizeManager.getInstance();
if (this.isAppendBlob) {
this.maxConcurrentRequestCount = 1;
} else {
Expand Down Expand Up @@ -216,6 +222,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
this.tracingContext.setStreamID(outputStreamId);
this.tracingContext.setOperation(FSOperationType.WRITE);
this.ioStatistics = outputStreamStatistics.getIOStatistics();
//poolSizeManager.registerAbfsOutputStream(this);
}

private final Lock lock = new ReentrantLock();
Expand Down Expand Up @@ -255,6 +262,36 @@ public String getETag() {
}
}

// public synchronized void setCurrentPoolSize(int poolSize) throws InterruptedException {
// this.currentPoolSize = poolSize;
// System.out.println("Pool size changed: " + currentPoolSize + " " + getStreamID());
// adjustConcurrentWrites();
// }
//
// public synchronized void adjustConcurrentWrites() throws InterruptedException {
// int initialPoolSize = poolSizeManager.getMaxPoolSize();
// int currentPoolSize = ((ThreadPoolExecutor)poolSizeManager.getExecutorService()).getMaximumPoolSize();
// if (currentPoolSize <= initialPoolSize / 4) {
// System.out.println("Here " + getStreamID());
// int totalOutputStreams = poolSizeManager.getTotalOutputStreams();
// if (totalOutputStreams <= 0) {
// System.out.println("Error: Total output streams should be greater than 0.");
// return;
// }
// int fixedConcurrentWrites = ((CustomSemaphoredExecutor) underlyingExecutor).getPermitCount();
// int optimalConcurrentWrites = Math.max(1, currentPoolSize / (fixedConcurrentWrites * totalOutputStreams));
// System.out.println("The fixedcount is :" + fixedConcurrentWrites);
// System.out.println("The optimal is: " + optimalConcurrentWrites);
// System.out.println("Adjusted Concurrent Writes per OutputStream: " + optimalConcurrentWrites);
// customExecutor.adjustMaxConcurrentRequests(optimalConcurrentWrites);
// }
// }
//
// public synchronized void poolSizeChanged(int newPoolSize) throws InterruptedException {
// setCurrentPoolSize(newPoolSize);
// }


public DataBlocks.BlockFactory getBlockFactory() {
return blockFactory;
}
Expand Down Expand Up @@ -666,7 +703,6 @@ public synchronized void close() throws IOException {
if (closed) {
return;
}

try {
flushInternal(true);
} catch (IOException e) {
Expand All @@ -688,6 +724,7 @@ public synchronized void close() throws IOException {
if (hasActiveBlock()) {
clearActiveBlock();
}
//poolSizeManager.deRegisterAbfsOutputStream(this);
}
LOG.debug("Closing AbfsOutputStream : {}", this);
}
Expand Down