Skip to content

Commit 1038637

Browse files
committed
HADOOP-19569. put operations from store to StoreWriter
All upload operations are in MultipartIO service, which has been renamed and move to package org.apache.hadoop.fs.s3a.impl.write to match. For completeness deletion should also go into this class or an adjacent one on deletion. Pulled out multipart IO such that there are no back references from it to S3AStore -the final change is to define a store statistics class which it and other things can use to update stats. Executors in hadoop-common to - pick up shutdown of inner executor and shut themselves down. - semaphore executor to decrement counters in this process so that queue state is updated - semaphored delegating executor unit test in common This stops callers being able to submit work when the inner executor has shut down. WriteOperationHelper * make all calls through its callback interface, rather than given a ref to S3AFS. * Move WriteOperationHelper callbacks to S3Store layer, Multipart IO operations * move nearly all Multpart IO operationss out of s3afs and into a new mulitpart service interface and impl * Multipart service retrieved and invoked as appropriate * StoreImpl stores a map of ServiceName -> service. with a lookupService() method in S3AStore interface, it's possible to retrieve services through the API just by knowing their name and type * registering all current services this way StoreImpl to IllegalStateException on method invocation whene the service isn't running. Some methods are kept open as they do seem needed.
1 parent 0f18821 commit 1038637

40 files changed

+1959
-1082
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,21 +130,20 @@ public static BlockingThreadPoolExecutorService newInstance(
130130
slower than enqueueing. */
131131
final BlockingQueue<Runnable> workQueue =
132132
new LinkedBlockingQueue<>(waitingTasks + activeTasks);
133+
final InnerExecutorRejection rejection = new InnerExecutorRejection();
133134
ThreadPoolExecutor eventProcessingExecutor =
134135
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
135136
workQueue, newDaemonThreadFactory(prefixName),
136-
new RejectedExecutionHandler() {
137-
@Override
138-
public void rejectedExecution(Runnable r,
139-
ThreadPoolExecutor executor) {
140-
// This is not expected to happen.
141-
LOG.error("Could not submit task to executor {}",
142-
executor.toString());
143-
}
144-
});
137+
rejection);
145138
eventProcessingExecutor.allowCoreThreadTimeOut(true);
146-
return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
147-
eventProcessingExecutor);
139+
final BlockingThreadPoolExecutorService service =
140+
new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
141+
eventProcessingExecutor);
142+
rejection.setDelegate((r, executor) -> {
143+
service.shutdown();
144+
});
145+
146+
return service;
148147
}
149148

150149
/**
@@ -164,4 +163,28 @@ public String toString() {
164163
.append('}');
165164
return sb.toString();
166165
}
166+
167+
private static class InnerExecutorRejection implements RejectedExecutionHandler {
168+
169+
private RejectedExecutionHandler delegate;
170+
171+
private RejectedExecutionHandler getDelegate() {
172+
return delegate;
173+
}
174+
175+
private void setDelegate(final RejectedExecutionHandler delegate) {
176+
this.delegate = delegate;
177+
}
178+
179+
@Override
180+
public void rejectedExecution(Runnable r,
181+
ThreadPoolExecutor executor) {
182+
// This is not expected to happen.
183+
LOG.error("Could not submit task to executor {}",
184+
executor.toString());
185+
if (getDelegate() != null) {
186+
delegate.rejectedExecution(r, executor);
187+
}
188+
}
189+
}
167190
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.ExecutionException;
3232
import java.util.concurrent.ExecutorService;
3333
import java.util.concurrent.Future;
34+
import java.util.concurrent.RejectedExecutionException;
3435
import java.util.concurrent.Semaphore;
3536
import java.util.concurrent.TimeUnit;
3637
import java.util.concurrent.TimeoutException;
@@ -127,6 +128,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
127128

128129
@Override
129130
public <T> Future<T> submit(Callable<T> task) {
131+
rejectWhenShutdown();
130132
try (DurationTracker ignored =
131133
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
132134
queueingPermits.acquire();
@@ -139,6 +141,7 @@ public <T> Future<T> submit(Callable<T> task) {
139141

140142
@Override
141143
public <T> Future<T> submit(Runnable task, T result) {
144+
rejectWhenShutdown();
142145
try (DurationTracker ignored =
143146
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
144147
queueingPermits.acquire();
@@ -151,6 +154,7 @@ public <T> Future<T> submit(Runnable task, T result) {
151154

152155
@Override
153156
public Future<?> submit(Runnable task) {
157+
rejectWhenShutdown();
154158
try (DurationTracker ignored =
155159
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
156160
queueingPermits.acquire();
@@ -163,6 +167,7 @@ public Future<?> submit(Runnable task) {
163167

164168
@Override
165169
public void execute(Runnable command) {
170+
rejectWhenShutdown();
166171
try (DurationTracker ignored =
167172
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
168173
queueingPermits.acquire();
@@ -208,6 +213,16 @@ public String toString() {
208213
return sb.toString();
209214
}
210215

216+
/**
217+
* Raise an exception if invoked when the executor is shut down.
218+
* @throws RejectedExecutionException if the executor is shut down.
219+
*/
220+
private void rejectWhenShutdown() throws RejectedExecutionException{
221+
if (isShutdown()) {
222+
throw new RejectedExecutionException("ExecutorService is shutdown");
223+
}
224+
}
225+
211226
/**
212227
* Releases a permit after the task is executed.
213228
*/
@@ -222,6 +237,7 @@ class RunnableWithPermitRelease implements Runnable {
222237
@Override
223238
public void run() {
224239
try {
240+
rejectWhenShutdown();
225241
delegatee.run();
226242
} finally {
227243
queueingPermits.release();
@@ -244,6 +260,7 @@ class CallableWithPermitRelease<T> implements Callable<T> {
244260
@Override
245261
public T call() throws Exception {
246262
try {
263+
rejectWhenShutdown();
247264
return delegatee.call();
248265
} finally {
249266
queueingPermits.release();
Lines changed: 61 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,41 +16,46 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.hadoop.fs.s3a;
19+
package org.apache.hadoop.util;
2020

21-
import org.apache.hadoop.test.AbstractHadoopTestBase;
22-
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
23-
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
24-
import org.apache.hadoop.util.StopWatch;
21+
import java.util.concurrent.Callable;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Future;
25+
import java.util.concurrent.RejectedExecutionException;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.stream.IntStream;
2528

29+
import org.assertj.core.api.Assertions;
2630
import org.junit.jupiter.api.AfterAll;
2731
import org.junit.jupiter.api.Test;
2832
import org.junit.jupiter.api.Timeout;
2933
import org.slf4j.Logger;
3034
import org.slf4j.LoggerFactory;
3135

32-
import java.util.concurrent.Callable;
33-
import java.util.concurrent.CountDownLatch;
34-
import java.util.concurrent.ExecutorService;
35-
import java.util.concurrent.Future;
36-
import java.util.concurrent.TimeUnit;
36+
import org.apache.hadoop.test.AbstractHadoopTestBase;
3737

38-
import static org.junit.jupiter.api.Assertions.assertEquals;
38+
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
3939

4040
/**
41-
* Basic test for S3A's blocking executor service.
41+
* Test for the blocking executor service.
4242
*/
4343
@Timeout(60)
44-
public class ITestBlockingThreadPoolExecutorService extends AbstractHadoopTestBase {
44+
public class TestBlockingThreadPoolExecutorService extends AbstractHadoopTestBase {
4545

4646
private static final Logger LOG = LoggerFactory.getLogger(
47-
ITestBlockingThreadPoolExecutorService.class);
47+
TestBlockingThreadPoolExecutorService.class);
4848

4949
private static final int NUM_ACTIVE_TASKS = 4;
50+
5051
private static final int NUM_WAITING_TASKS = 2;
52+
5153
private static final int TASK_SLEEP_MSEC = 100;
54+
5255
private static final int SHUTDOWN_WAIT_MSEC = 200;
56+
5357
private static final int SHUTDOWN_WAIT_TRIES = 5;
58+
5459
private static final int BLOCKING_THRESHOLD_MSEC = 50;
5560

5661
private static final Integer SOME_VALUE = 1337;
@@ -62,15 +67,15 @@ public static void afterClass() throws Exception {
6267
ensureDestroyed();
6368
}
6469

70+
6571
/**
6672
* Basic test of running one trivial task.
6773
*/
6874
@Test
6975
public void testSubmitCallable() throws Exception {
7076
ensureCreated();
7177
Future<Integer> f = tpe.submit(callableSleeper);
72-
Integer v = f.get();
73-
assertEquals(SOME_VALUE, v);
78+
Assertions.assertThat(f.get()).isEqualTo(SOME_VALUE);
7479
}
7580

7681
/**
@@ -91,9 +96,9 @@ public void testSubmitRunnable() throws Exception {
9196
protected void verifyQueueSize(ExecutorService executorService,
9297
int expectedQueueSize) {
9398
CountDownLatch latch = new CountDownLatch(1);
94-
for (int i = 0; i < expectedQueueSize; i++) {
95-
executorService.submit(new LatchedSleeper(latch));
96-
}
99+
IntStream.range(0, expectedQueueSize)
100+
.mapToObj(i -> new LatchedSleeper(latch))
101+
.forEach(executorService::submit);
97102
StopWatch stopWatch = new StopWatch().start();
98103
latch.countDown();
99104
executorService.submit(sleeper);
@@ -121,6 +126,27 @@ public void testChainedQueue() throws Throwable {
121126
verifyQueueSize(wrapper, size);
122127
}
123128

129+
@Test
130+
public void testShutdownQueueRejectsOperations() throws Throwable {
131+
ensureCreated();
132+
tpe.shutdown();
133+
try {
134+
Assertions.assertThat(tpe.isShutdown())
135+
.describedAs("%s should be shutdown", tpe)
136+
.isTrue();
137+
// runnable
138+
intercept(RejectedExecutionException.class, () ->
139+
tpe.submit(failToRun));
140+
// callable
141+
intercept(RejectedExecutionException.class, () ->
142+
tpe.submit(() -> 0));
143+
intercept(RejectedExecutionException.class, () ->
144+
tpe.execute(failToRun));
145+
} finally {
146+
tpe = null;
147+
}
148+
}
149+
124150
// Helper functions, etc.
125151

126152
private void assertDidBlock(StopWatch sw) {
@@ -133,28 +159,28 @@ private void assertDidBlock(StopWatch sw) {
133159
}
134160
}
135161

136-
private Runnable sleeper = new Runnable() {
137-
@Override
138-
public void run() {
139-
String name = Thread.currentThread().getName();
140-
try {
141-
Thread.sleep(TASK_SLEEP_MSEC);
142-
} catch (InterruptedException e) {
143-
LOG.info("Thread {} interrupted.", name);
144-
Thread.currentThread().interrupt();
145-
}
146-
}
162+
private Runnable failToRun = () -> {
163+
throw new RuntimeException("Failed to Run");
147164
};
148165

149-
private Callable<Integer> callableSleeper = new Callable<Integer>() {
150-
@Override
151-
public Integer call() throws Exception {
152-
sleeper.run();
153-
return SOME_VALUE;
166+
private Runnable sleeper = () -> {
167+
String name = Thread.currentThread().getName();
168+
try {
169+
Thread.sleep(TASK_SLEEP_MSEC);
170+
} catch (InterruptedException e) {
171+
LOG.info("Thread {} interrupted.", name);
172+
Thread.currentThread().interrupt();
154173
}
155174
};
156175

176+
private Callable<Integer> callableSleeper = () -> {
177+
sleeper.run();
178+
return SOME_VALUE;
179+
};
180+
181+
157182
private class LatchedSleeper implements Runnable {
183+
158184
private final CountDownLatch latch;
159185

160186
LatchedSleeper(CountDownLatch latch) {

0 commit comments

Comments
 (0)