Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,20 @@ public static BlockingThreadPoolExecutorService newInstance(
slower than enqueueing. */
final BlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(waitingTasks + activeTasks);
final InnerExecutorRejection rejection = new InnerExecutorRejection();
Copy link

Copilot AI May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The InnerExecutorRejection handler now shuts down the service upon rejection. Consider enhancing the error handling logic or adding more detailed documentation to explain the shutdown behavior in case of task rejection.

Copilot uses AI. Check for mistakes.

ThreadPoolExecutor eventProcessingExecutor =
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
workQueue, newDaemonThreadFactory(prefixName),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) {
// This is not expected to happen.
LOG.error("Could not submit task to executor {}",
executor.toString());
}
});
rejection);
eventProcessingExecutor.allowCoreThreadTimeOut(true);
return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
eventProcessingExecutor);
final BlockingThreadPoolExecutorService service =
new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
eventProcessingExecutor);
rejection.setDelegate((r, executor) -> {
service.shutdown();
});

return service;
}

/**
Expand All @@ -164,4 +163,28 @@ public String toString() {
.append('}');
return sb.toString();
}

private static class InnerExecutorRejection implements RejectedExecutionHandler {

private RejectedExecutionHandler delegate;

private RejectedExecutionHandler getDelegate() {
return delegate;
}

private void setDelegate(final RejectedExecutionHandler delegate) {
this.delegate = delegate;
}

@Override
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) {
// This is not expected to happen.
LOG.error("Could not submit task to executor {}",
executor.toString());
if (getDelegate() != null) {
delegate.rejectedExecution(r, executor);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -127,6 +128,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,

@Override
public <T> Future<T> submit(Callable<T> task) {
rejectWhenShutdown();
try (DurationTracker ignored =
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
queueingPermits.acquire();
Expand All @@ -139,6 +141,7 @@ public <T> Future<T> submit(Callable<T> task) {

@Override
public <T> Future<T> submit(Runnable task, T result) {
rejectWhenShutdown();
try (DurationTracker ignored =
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
queueingPermits.acquire();
Expand All @@ -151,6 +154,7 @@ public <T> Future<T> submit(Runnable task, T result) {

@Override
public Future<?> submit(Runnable task) {
rejectWhenShutdown();
try (DurationTracker ignored =
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
queueingPermits.acquire();
Expand All @@ -163,6 +167,7 @@ public Future<?> submit(Runnable task) {

@Override
public void execute(Runnable command) {
rejectWhenShutdown();
try (DurationTracker ignored =
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
queueingPermits.acquire();
Expand Down Expand Up @@ -208,6 +213,16 @@ public String toString() {
return sb.toString();
}

/**
* Raise an exception if invoked when the executor is shut down.
* @throws RejectedExecutionException if the executor is shut down.
*/
private void rejectWhenShutdown() throws RejectedExecutionException{
if (isShutdown()) {
throw new RejectedExecutionException("ExecutorService is shutdown");
}
}

/**
* Releases a permit after the task is executed.
*/
Expand All @@ -222,6 +237,7 @@ class RunnableWithPermitRelease implements Runnable {
@Override
public void run() {
try {
rejectWhenShutdown();
delegatee.run();
} finally {
queueingPermits.release();
Expand All @@ -244,6 +260,7 @@ class CallableWithPermitRelease<T> implements Callable<T> {
@Override
public T call() throws Exception {
try {
rejectWhenShutdown();
return delegatee.call();
} finally {
queueingPermits.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,46 @@
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;
package org.apache.hadoop.util;

import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.StopWatch;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
* Basic test for S3A's blocking executor service.
* Test for the blocking executor service.
*/
@Timeout(60)
public class ITestBlockingThreadPoolExecutorService extends AbstractHadoopTestBase {
public class TestBlockingThreadPoolExecutorService extends AbstractHadoopTestBase {

private static final Logger LOG = LoggerFactory.getLogger(
ITestBlockingThreadPoolExecutorService.class);
TestBlockingThreadPoolExecutorService.class);

private static final int NUM_ACTIVE_TASKS = 4;

private static final int NUM_WAITING_TASKS = 2;

private static final int TASK_SLEEP_MSEC = 100;

private static final int SHUTDOWN_WAIT_MSEC = 200;

private static final int SHUTDOWN_WAIT_TRIES = 5;

private static final int BLOCKING_THRESHOLD_MSEC = 50;

private static final Integer SOME_VALUE = 1337;
Expand All @@ -62,15 +67,15 @@ public static void afterClass() throws Exception {
ensureDestroyed();
}


/**
* Basic test of running one trivial task.
*/
@Test
public void testSubmitCallable() throws Exception {
ensureCreated();
Future<Integer> f = tpe.submit(callableSleeper);
Integer v = f.get();
assertEquals(SOME_VALUE, v);
Assertions.assertThat(f.get()).isEqualTo(SOME_VALUE);
}

/**
Expand All @@ -91,9 +96,9 @@ public void testSubmitRunnable() throws Exception {
protected void verifyQueueSize(ExecutorService executorService,
int expectedQueueSize) {
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < expectedQueueSize; i++) {
executorService.submit(new LatchedSleeper(latch));
}
IntStream.range(0, expectedQueueSize)
.mapToObj(i -> new LatchedSleeper(latch))
.forEach(executorService::submit);
StopWatch stopWatch = new StopWatch().start();
latch.countDown();
executorService.submit(sleeper);
Expand Down Expand Up @@ -121,6 +126,27 @@ public void testChainedQueue() throws Throwable {
verifyQueueSize(wrapper, size);
}

@Test
public void testShutdownQueueRejectsOperations() throws Throwable {
ensureCreated();
tpe.shutdown();
try {
Assertions.assertThat(tpe.isShutdown())
.describedAs("%s should be shutdown", tpe)
.isTrue();
// runnable
intercept(RejectedExecutionException.class, () ->
tpe.submit(failToRun));
// callable
intercept(RejectedExecutionException.class, () ->
tpe.submit(() -> 0));
intercept(RejectedExecutionException.class, () ->
tpe.execute(failToRun));
} finally {
tpe = null;
}
}

// Helper functions, etc.

private void assertDidBlock(StopWatch sw) {
Expand All @@ -133,28 +159,28 @@ private void assertDidBlock(StopWatch sw) {
}
}

private Runnable sleeper = new Runnable() {
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
Thread.sleep(TASK_SLEEP_MSEC);
} catch (InterruptedException e) {
LOG.info("Thread {} interrupted.", name);
Thread.currentThread().interrupt();
}
}
private Runnable failToRun = () -> {
throw new RuntimeException("Failed to Run");
};

private Callable<Integer> callableSleeper = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
sleeper.run();
return SOME_VALUE;
private Runnable sleeper = () -> {
String name = Thread.currentThread().getName();
try {
Thread.sleep(TASK_SLEEP_MSEC);
} catch (InterruptedException e) {
LOG.info("Thread {} interrupted.", name);
Thread.currentThread().interrupt();
}
};

private Callable<Integer> callableSleeper = () -> {
sleeper.run();
return SOME_VALUE;
};


private class LatchedSleeper implements Runnable {

private final CountDownLatch latch;

LatchedSleeper(CountDownLatch latch) {
Expand Down
Loading