Skip to content

Commit 7de1ac0

Browse files
committed
HADOOP-16798. S3A Committer thread pool shutdown problems. (#1963)
Contributed by Steve Loughran. Fixes a condition which can cause job commit to fail if a task was aborted < 60s before the job commit commenced: the task abort will shut down the thread pool with a hard exit after 60s; the job commit POST requests would be scheduled through the same pool, so be interrupted and fail. At present the access is synchronized, but presumably the executor shutdown code is calling wait() and releasing locks. Task abort is triggered from the AM when task attempts succeed but there are still active speculative task attempts running. Thus it only surfaces when speculation is enabled and the final tasks are speculating, which, given they are the stragglers, is not unheard of. Note: this problem has never been seen in production; it has surfaced in the hadoop-aws tests on a heavily overloaded desktop Change-Id: I3b433356d01fcc50d88b4353dbca018484984bc8
1 parent 8b48274 commit 7de1ac0

File tree

5 files changed

+142
-40
lines changed

5 files changed

+142
-40
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java

Lines changed: 100 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Date;
2525
import java.util.List;
2626
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Future;
2728
import java.util.concurrent.TimeUnit;
2829
import java.util.stream.Collectors;
2930

@@ -472,7 +473,7 @@ protected void commitPendingUploads(
472473
Tasks.foreach(pending.getSourceFiles())
473474
.stopOnFailure()
474475
.suppressExceptions(false)
475-
.executeWith(buildThreadPool(context))
476+
.executeWith(buildSubmitter(context))
476477
.abortWith(path ->
477478
loadAndAbort(commitContext, pending, path, true, false))
478479
.revertWith(path ->
@@ -502,7 +503,7 @@ protected void precommitCheckPendingFiles(
502503
Tasks.foreach(pending.getSourceFiles())
503504
.stopOnFailure()
504505
.suppressExceptions(false)
505-
.executeWith(buildThreadPool(context))
506+
.executeWith(buildSubmitter(context))
506507
.run(path -> PendingSet.load(sourceFS, path));
507508
}
508509
}
@@ -525,7 +526,7 @@ private void loadAndCommit(
525526
Tasks.foreach(pendingSet.getCommits())
526527
.stopOnFailure()
527528
.suppressExceptions(false)
528-
.executeWith(singleCommitThreadPool())
529+
.executeWith(singleThreadSubmitter())
529530
.onFailure((commit, exception) ->
530531
commitContext.abortSingleCommit(commit))
531532
.abortWith(commitContext::abortSingleCommit)
@@ -580,7 +581,7 @@ private void loadAndAbort(
580581
path);
581582
FileSystem fs = getDestFS();
582583
Tasks.foreach(pendingSet.getCommits())
583-
.executeWith(singleCommitThreadPool())
584+
.executeWith(singleThreadSubmitter())
584585
.suppressExceptions(suppressExceptions)
585586
.run(commit -> {
586587
try {
@@ -674,7 +675,7 @@ protected void abortPendingUploadsInCleanup(
674675
return;
675676
}
676677
Tasks.foreach(pending)
677-
.executeWith(buildThreadPool(getJobContext()))
678+
.executeWith(buildSubmitter(getJobContext()))
678679
.suppressExceptions(suppressExceptions)
679680
.run(u -> commitContext.abortMultipartCommit(
680681
u.getKey(), u.getUploadId()));
@@ -838,44 +839,116 @@ protected String getRole() {
838839
}
839840

840841
/**
841-
* Returns an {@link ExecutorService} for parallel tasks. The number of
842+
* Returns an {@link Tasks.Submitter} for parallel tasks. The number of
842843
* threads in the thread-pool is set by fs.s3a.committer.threads.
843844
* If num-threads is 0, this will return null;
845+
* this is used in Tasks as a cue
846+
* to switch to single-threaded execution.
844847
*
845848
* @param context the JobContext for this commit
846-
* @return an {@link ExecutorService} or null for the number of threads
849+
* @return a submitter or null
847850
*/
848-
protected final synchronized ExecutorService buildThreadPool(
851+
protected Tasks.Submitter buildSubmitter(
849852
JobContext context) {
853+
if (getThreadCount(context) > 0) {
854+
return new PoolSubmitter(context);
855+
} else {
856+
return null;
857+
}
858+
}
850859

860+
/**
861+
* Returns an {@link ExecutorService} for parallel tasks. The number of
862+
* threads in the thread-pool is set by fs.s3a.committer.threads.
863+
* If num-threads is 0, this will raise an exception.
864+
*
865+
* @param context the JobContext for this commit
866+
* @param numThreads threads
867+
* @return an {@link ExecutorService} for the number of threads
868+
*/
869+
private synchronized ExecutorService buildThreadPool(
870+
JobContext context, int numThreads) {
871+
Preconditions.checkArgument(numThreads > 0,
872+
"Cannot create a thread pool with no threads");
851873
if (threadPool == null) {
852-
int numThreads = context.getConfiguration().getInt(
853-
FS_S3A_COMMITTER_THREADS,
854-
DEFAULT_COMMITTER_THREADS);
855874
LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads);
856-
if (numThreads > 0) {
857-
threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
858-
new ThreadFactoryBuilder()
859-
.setDaemon(true)
860-
.setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d")
861-
.build());
862-
} else {
863-
return null;
864-
}
875+
threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
876+
new ThreadFactoryBuilder()
877+
.setDaemon(true)
878+
.setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d")
879+
.build());
865880
}
866881
return threadPool;
867882
}
868883

884+
/**
885+
* Get the thread count for this job's commit operations.
886+
* @param context the JobContext for this commit
887+
* @return a possibly zero thread count.
888+
*/
889+
private int getThreadCount(final JobContext context) {
890+
return context.getConfiguration().getInt(
891+
FS_S3A_COMMITTER_THREADS,
892+
DEFAULT_COMMITTER_THREADS);
893+
}
894+
895+
/**
896+
* Submit a runnable.
897+
* This will demand-create the thread pool if needed.
898+
* <p></p>
899+
* This is synchronized to ensure the thread pool is always valid when
900+
* work is synchronized. See HADOOP-16798.
901+
* @param context the JobContext for this commit
902+
* @param task task to execute
903+
* @return the future of the submitted task.
904+
*/
905+
private synchronized Future<?> submitRunnable(
906+
final JobContext context,
907+
final Runnable task) {
908+
return buildThreadPool(context, getThreadCount(context)).submit(task);
909+
}
910+
911+
/**
912+
* The real task submitter, which hands off the work to
913+
* the current thread pool.
914+
*/
915+
private final class PoolSubmitter implements Tasks.Submitter {
916+
917+
private final JobContext context;
918+
919+
private final int numThreads;
920+
921+
private PoolSubmitter(final JobContext context) {
922+
this.numThreads = getThreadCount(context);
923+
Preconditions.checkArgument(numThreads > 0,
924+
"Cannot create a thread pool with no threads");
925+
this.context = context;
926+
}
927+
928+
@Override
929+
public Future<?> submit(final Runnable task) {
930+
return submitRunnable(context, task);
931+
}
932+
933+
}
934+
869935
/**
870936
* Destroy any thread pools; wait for that to finish,
871937
* but don't overreact if it doesn't finish in time.
872938
*/
873-
protected synchronized void destroyThreadPool() {
874-
if (threadPool != null) {
939+
protected void destroyThreadPool() {
940+
ExecutorService pool;
941+
// reset the thread pool in a sync block, then shut it down
942+
// afterwards. This allows for other threads to create a
943+
// new thread pool on demand.
944+
synchronized(this) {
945+
pool = this.threadPool;
946+
threadPool = null;
947+
}
948+
if (pool != null) {
875949
LOG.debug("Destroying thread pool");
876-
HadoopExecutors.shutdown(threadPool, LOG,
950+
HadoopExecutors.shutdown(pool, LOG,
877951
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
878-
threadPool = null;
879952
}
880953
}
881954

@@ -884,11 +957,9 @@ protected synchronized void destroyThreadPool() {
884957
* within the commit of all uploads of a single task.
885958
* This is currently null; it is here to allow the Tasks class to
886959
* provide the logic for execute/revert.
887-
* Why not use the existing thread pool? Too much fear of deadlocking,
888-
* and tasks are being committed in parallel anyway.
889960
* @return null. always.
890961
*/
891-
protected final synchronized ExecutorService singleCommitThreadPool() {
962+
protected final synchronized Tasks.Submitter singleThreadSubmitter() {
892963
return null;
893964
}
894965

@@ -932,7 +1003,7 @@ protected void abortPendingUploads(JobContext context,
9321003
CommitOperations.CommitContext commitContext
9331004
= initiateCommitOperation()) {
9341005
Tasks.foreach(pending)
935-
.executeWith(buildThreadPool(context))
1006+
.executeWith(buildSubmitter(context))
9361007
.suppressExceptions(suppressExceptions)
9371008
.run(commitContext::abortSingleCommit);
9381009
}
@@ -961,7 +1032,7 @@ protected void abortPendingUploads(
9611032
CommitOperations.CommitContext commitContext
9621033
= initiateCommitOperation()) {
9631034
Tasks.foreach(pending.getSourceFiles())
964-
.executeWith(buildThreadPool(context))
1035+
.executeWith(buildSubmitter(context))
9651036
.suppressExceptions(suppressExceptions)
9661037
.run(path ->
9671038
loadAndAbort(commitContext,

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.List;
2626
import java.util.Queue;
2727
import java.util.concurrent.ConcurrentLinkedQueue;
28-
import java.util.concurrent.ExecutorService;
2928
import java.util.concurrent.Future;
3029
import java.util.concurrent.atomic.AtomicBoolean;
3130

@@ -76,7 +75,7 @@ public interface FailureTask<I, E extends Exception> {
7675
*/
7776
public static class Builder<I> {
7877
private final Iterable<I> items;
79-
private ExecutorService service = null;
78+
private Submitter service = null;
8079
private FailureTask<I, ?> onFailure = null;
8180
private boolean stopOnFailure = false;
8281
private boolean suppressExceptions = false;
@@ -96,11 +95,11 @@ public static class Builder<I> {
9695
/**
9796
* Declare executor service: if null, the tasks are executed in a single
9897
* thread.
99-
* @param executorService service to schedule tasks with.
98+
* @param submitter service to schedule tasks with.
10099
* @return this builder.
101100
*/
102-
public Builder<I> executeWith(ExecutorService executorService) {
103-
this.service = executorService;
101+
public Builder<I> executeWith(Submitter submitter) {
102+
this.service = submitter;
104103
return this;
105104
}
106105

@@ -407,4 +406,18 @@ private static <E extends Exception> void castAndThrow(Exception e) throws E {
407406
}
408407
throw (E) e;
409408
}
409+
410+
/**
411+
* Interface to whatever lets us submit tasks.
412+
*/
413+
public interface Submitter {
414+
415+
/**
416+
* Submit work.
417+
* @param task task to execute
418+
* @return the future of the submitted task.
419+
*/
420+
Future<?> submit(Runnable task);
421+
}
422+
410423
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Map;
2424
import java.util.Set;
2525
import java.util.concurrent.ConcurrentHashMap;
26-
import java.util.concurrent.ExecutorService;
2726

2827
import org.slf4j.Logger;
2928
import org.slf4j.LoggerFactory;
@@ -187,7 +186,7 @@ private void replacePartitions(
187186

188187
Map<Path, String> partitions = new ConcurrentHashMap<>();
189188
FileSystem sourceFS = pending.getSourceFS();
190-
ExecutorService pool = buildThreadPool(context);
189+
Tasks.Submitter submitter = buildSubmitter(context);
191190
try (DurationInfo ignored =
192191
new DurationInfo(LOG, "Replacing partitions")) {
193192

@@ -198,7 +197,7 @@ private void replacePartitions(
198197
Tasks.foreach(pending.getSourceFiles())
199198
.stopOnFailure()
200199
.suppressExceptions(false)
201-
.executeWith(pool)
200+
.executeWith(submitter)
202201
.run(path -> {
203202
PendingSet pendingSet = PendingSet.load(sourceFS, path);
204203
Path lastParent = null;
@@ -216,7 +215,7 @@ private void replacePartitions(
216215
Tasks.foreach(partitions.keySet())
217216
.stopOnFailure()
218217
.suppressExceptions(false)
219-
.executeWith(pool)
218+
.executeWith(submitter)
220219
.run(partitionPath -> {
221220
LOG.debug("{}: removing partition path to be replaced: " +
222221
getRole(), partitionPath);

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,7 @@ protected int commitTaskInternal(final TaskAttemptContext context,
699699
Tasks.foreach(taskOutput)
700700
.stopOnFailure()
701701
.suppressExceptions(false)
702-
.executeWith(buildThreadPool(context))
702+
.executeWith(buildSubmitter(context))
703703
.run(stat -> {
704704
Path path = stat.getPath();
705705
File localFile = new File(path.toUri().getPath());

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Optional;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
2829
import java.util.concurrent.atomic.AtomicInteger;
2930
import java.util.function.Function;
3031
import java.util.stream.Collectors;
@@ -57,6 +58,12 @@ public class TestTasks extends HadoopTestBase {
5758
* Thread pool for task execution.
5859
*/
5960
private ExecutorService threadPool;
61+
62+
/**
63+
* Task submitter bonded to the thread pool, or
64+
* null for the 0-thread case.
65+
*/
66+
Tasks.Submitter submitter;
6067
private final CounterTask failingTask
6168
= new CounterTask("failing committer", FAILPOINT, Item::commit);
6269

@@ -117,6 +124,9 @@ public void setup() {
117124
.setDaemon(true)
118125
.setNameFormat(getMethodName() + "-pool-%d")
119126
.build());
127+
submitter = new PoolSubmitter();
128+
} else {
129+
submitter = null;
120130
}
121131

122132
}
@@ -129,12 +139,21 @@ public void teardown() {
129139
}
130140
}
131141

142+
private class PoolSubmitter implements Tasks.Submitter {
143+
144+
@Override
145+
public Future<?> submit(final Runnable task) {
146+
return threadPool.submit(task);
147+
}
148+
149+
}
150+
132151
/**
133152
* create the builder.
134153
* @return pre-inited builder
135154
*/
136155
private Tasks.Builder<Item> builder() {
137-
return Tasks.foreach(items).executeWith(threadPool);
156+
return Tasks.foreach(items).executeWith(submitter);
138157
}
139158

140159
private void assertRun(Tasks.Builder<Item> builder,

0 commit comments

Comments
 (0)