Skip to content

HADOOP-16798. S3A Committer thread pool shutdown problems. #1963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -472,7 +473,7 @@ protected void commitPendingUploads(
Tasks.foreach(pending.getSourceFiles())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(buildThreadPool(context))
.executeWith(buildSubmitter(context))
.abortWith(path ->
loadAndAbort(commitContext, pending, path, true, false))
.revertWith(path ->
Expand Down Expand Up @@ -502,7 +503,7 @@ protected void precommitCheckPendingFiles(
Tasks.foreach(pending.getSourceFiles())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(buildThreadPool(context))
.executeWith(buildSubmitter(context))
.run(path -> PendingSet.load(sourceFS, path));
}
}
Expand All @@ -525,7 +526,7 @@ private void loadAndCommit(
Tasks.foreach(pendingSet.getCommits())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(singleCommitThreadPool())
.executeWith(singleThreadSubmitter())
.onFailure((commit, exception) ->
commitContext.abortSingleCommit(commit))
.abortWith(commitContext::abortSingleCommit)
Expand Down Expand Up @@ -580,7 +581,7 @@ private void loadAndAbort(
path);
FileSystem fs = getDestFS();
Tasks.foreach(pendingSet.getCommits())
.executeWith(singleCommitThreadPool())
.executeWith(singleThreadSubmitter())
.suppressExceptions(suppressExceptions)
.run(commit -> {
try {
Expand Down Expand Up @@ -674,7 +675,7 @@ protected void abortPendingUploadsInCleanup(
return;
}
Tasks.foreach(pending)
.executeWith(buildThreadPool(getJobContext()))
.executeWith(buildSubmitter(getJobContext()))
.suppressExceptions(suppressExceptions)
.run(u -> commitContext.abortMultipartCommit(
u.getKey(), u.getUploadId()));
Expand Down Expand Up @@ -838,44 +839,116 @@ protected String getRole() {
}

/**
* Returns an {@link ExecutorService} for parallel tasks. The number of
* Returns an {@link Tasks.Submitter} for parallel tasks. The number of
* threads in the thread-pool is set by fs.s3a.committer.threads.
* If num-threads is 0, this will return null;
* this is used in Tasks as a cue
* to switch to single-threaded execution.
*
* @param context the JobContext for this commit
* @return an {@link ExecutorService} or null for the number of threads
* @return a submitter or null
*/
protected final synchronized ExecutorService buildThreadPool(
protected Tasks.Submitter buildSubmitter(
JobContext context) {
if (getThreadCount(context) > 0) {
return new PoolSubmitter(context);
} else {
return null;
}
}

/**
* Returns an {@link ExecutorService} for parallel tasks. The number of
* threads in the thread-pool is set by fs.s3a.committer.threads.
* If num-threads is 0, this will raise an exception.
*
* @param context the JobContext for this commit
* @param numThreads threads
* @return an {@link ExecutorService} for the number of threads
*/
private synchronized ExecutorService buildThreadPool(
JobContext context, int numThreads) {
Preconditions.checkArgument(numThreads > 0,
"Cannot create a thread pool with no threads");
if (threadPool == null) {
int numThreads = context.getConfiguration().getInt(
FS_S3A_COMMITTER_THREADS,
DEFAULT_COMMITTER_THREADS);
LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads);
if (numThreads > 0) {
threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d")
.build());
} else {
return null;
}
threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d")
.build());
}
return threadPool;
}

/**
* Get the thread count for this job's commit operations.
* @param context the JobContext for this commit
* @return a possibly zero thread count.
*/
private int getThreadCount(final JobContext context) {
return context.getConfiguration().getInt(
FS_S3A_COMMITTER_THREADS,
DEFAULT_COMMITTER_THREADS);
}

/**
* Submit a runnable.
* This will demand-create the thread pool if needed.
* <p></p>
* This is synchronized to ensure the thread pool is always valid when
* work is synchronized. See HADOOP-16798.
* @param context the JobContext for this commit
* @param task task to execute
* @return the future of the submitted task.
*/
private synchronized Future<?> submitRunnable(
final JobContext context,
final Runnable task) {
return buildThreadPool(context, getThreadCount(context)).submit(task);
}

/**
* The real task submitter, which hands off the work to
* the current thread pool.
*/
private final class PoolSubmitter implements Tasks.Submitter {

private final JobContext context;

private final int numThreads;

private PoolSubmitter(final JobContext context) {
this.numThreads = getThreadCount(context);
Preconditions.checkArgument(numThreads > 0,
"Cannot create a thread pool with no threads");
this.context = context;
}

@Override
public Future<?> submit(final Runnable task) {
return submitRunnable(context, task);
}

}

/**
* Destroy any thread pools; wait for that to finish,
* but don't overreact if it doesn't finish in time.
*/
protected synchronized void destroyThreadPool() {
if (threadPool != null) {
protected void destroyThreadPool() {
ExecutorService pool;
// reset the thread pool in a sync block, then shut it down
// afterwards. This allows for other threads to create a
// new thread pool on demand.
synchronized(this) {
pool = this.threadPool;
threadPool = null;
}
if (pool != null) {
LOG.debug("Destroying thread pool");
HadoopExecutors.shutdown(threadPool, LOG,
HadoopExecutors.shutdown(pool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
threadPool = null;
}
}

Expand All @@ -884,11 +957,9 @@ protected synchronized void destroyThreadPool() {
* within the commit of all uploads of a single task.
* This is currently null; it is here to allow the Tasks class to
* provide the logic for execute/revert.
* Why not use the existing thread pool? Too much fear of deadlocking,
* and tasks are being committed in parallel anyway.
* @return null. always.
*/
protected final synchronized ExecutorService singleCommitThreadPool() {
protected final synchronized Tasks.Submitter singleThreadSubmitter() {
return null;
}

Expand Down Expand Up @@ -932,7 +1003,7 @@ protected void abortPendingUploads(JobContext context,
CommitOperations.CommitContext commitContext
= initiateCommitOperation()) {
Tasks.foreach(pending)
.executeWith(buildThreadPool(context))
.executeWith(buildSubmitter(context))
.suppressExceptions(suppressExceptions)
.run(commitContext::abortSingleCommit);
}
Expand Down Expand Up @@ -961,7 +1032,7 @@ protected void abortPendingUploads(
CommitOperations.CommitContext commitContext
= initiateCommitOperation()) {
Tasks.foreach(pending.getSourceFiles())
.executeWith(buildThreadPool(context))
.executeWith(buildSubmitter(context))
.suppressExceptions(suppressExceptions)
.run(path ->
loadAndAbort(commitContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -76,7 +75,7 @@ public interface FailureTask<I, E extends Exception> {
*/
public static class Builder<I> {
private final Iterable<I> items;
private ExecutorService service = null;
private Submitter service = null;
private FailureTask<I, ?> onFailure = null;
private boolean stopOnFailure = false;
private boolean suppressExceptions = false;
Expand All @@ -96,11 +95,11 @@ public static class Builder<I> {
/**
* Declare executor service: if null, the tasks are executed in a single
* thread.
* @param executorService service to schedule tasks with.
* @param submitter service to schedule tasks with.
* @return this builder.
*/
public Builder<I> executeWith(ExecutorService executorService) {
this.service = executorService;
public Builder<I> executeWith(Submitter submitter) {
this.service = submitter;
return this;
}

Expand Down Expand Up @@ -407,4 +406,18 @@ private static <E extends Exception> void castAndThrow(Exception e) throws E {
}
throw (E) e;
}

/**
* Interface to whatever lets us submit tasks.
*/
public interface Submitter {

/**
* Submit work.
* @param task task to execute
* @return the future of the submitted task.
*/
Future<?> submit(Runnable task);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -187,7 +186,7 @@ private void replacePartitions(

Map<Path, String> partitions = new ConcurrentHashMap<>();
FileSystem sourceFS = pending.getSourceFS();
ExecutorService pool = buildThreadPool(context);
Tasks.Submitter submitter = buildSubmitter(context);
try (DurationInfo ignored =
new DurationInfo(LOG, "Replacing partitions")) {

Expand All @@ -198,7 +197,7 @@ private void replacePartitions(
Tasks.foreach(pending.getSourceFiles())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(pool)
.executeWith(submitter)
.run(path -> {
PendingSet pendingSet = PendingSet.load(sourceFS, path);
Path lastParent = null;
Expand All @@ -216,7 +215,7 @@ private void replacePartitions(
Tasks.foreach(partitions.keySet())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(pool)
.executeWith(submitter)
.run(partitionPath -> {
LOG.debug("{}: removing partition path to be replaced: " +
getRole(), partitionPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ protected int commitTaskInternal(final TaskAttemptContext context,
Tasks.foreach(taskOutput)
.stopOnFailure()
.suppressExceptions(false)
.executeWith(buildThreadPool(context))
.executeWith(buildSubmitter(context))
.run(stat -> {
Path path = stat.getPath();
File localFile = new File(path.toUri().getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -57,6 +58,12 @@ public class TestTasks extends HadoopTestBase {
* Thread pool for task execution.
*/
private ExecutorService threadPool;

/**
* Task submitter bonded to the thread pool, or
* null for the 0-thread case.
*/
Tasks.Submitter submitter;
private final CounterTask failingTask
= new CounterTask("failing committer", FAILPOINT, Item::commit);

Expand Down Expand Up @@ -117,6 +124,9 @@ public void setup() {
.setDaemon(true)
.setNameFormat(getMethodName() + "-pool-%d")
.build());
submitter = new PoolSubmitter();
} else {
submitter = null;
}

}
Expand All @@ -129,12 +139,21 @@ public void teardown() {
}
}

private class PoolSubmitter implements Tasks.Submitter {

@Override
public Future<?> submit(final Runnable task) {
return threadPool.submit(task);
}

}

/**
* create the builder.
* @return pre-inited builder
*/
private Tasks.Builder<Item> builder() {
return Tasks.foreach(items).executeWith(threadPool);
return Tasks.foreach(items).executeWith(submitter);
}

private void assertRun(Tasks.Builder<Item> builder,
Expand Down