Skip to content

Commit 63018dc

Browse files
authored
HADOOP-17998. Allow get command to run with multi threads. (#3645)
1 parent c88640c commit 63018dc

File tree

8 files changed

+545
-239
lines changed

8 files changed

+545
-239
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -397,11 +397,11 @@ private boolean checkPathsForReservedRaw(Path src, Path target)
397397

398398
/**
399399
* If direct write is disabled ,copies the stream contents to a temporary
400-
* file "<target>._COPYING_". If the copy is
401-
* successful, the temporary file will be renamed to the real path,
402-
* else the temporary file will be deleted.
400+
* file "target._COPYING_". If the copy is successful, the temporary file
401+
* will be renamed to the real path, else the temporary file will be deleted.
403402
* if direct write is enabled , then creation temporary file is skipped.
404-
* @param in the input stream for the copy
403+
*
404+
* @param in the input stream for the copy
405405
* @param target where to store the contents of the stream
406406
* @throws IOException if copy fails
407407
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.shell;
20+
21+
import java.io.IOException;
22+
import java.util.LinkedList;
23+
import java.util.concurrent.ArrayBlockingQueue;
24+
import java.util.concurrent.ThreadPoolExecutor;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import org.apache.hadoop.classification.VisibleForTesting;
28+
29+
/**
30+
* Abstract command to enable sub copy commands run with multi-thread.
31+
*/
32+
public abstract class CopyCommandWithMultiThread
33+
extends CommandWithDestination {
34+
35+
private int threadCount = 1;
36+
private ThreadPoolExecutor executor = null;
37+
private int threadPoolQueueSize = DEFAULT_QUEUE_SIZE;
38+
39+
public static final int DEFAULT_QUEUE_SIZE = 1024;
40+
41+
/**
42+
* set thread count by option value, if the value less than 1,
43+
* use 1 instead.
44+
*
45+
* @param optValue option value
46+
*/
47+
protected void setThreadCount(String optValue) {
48+
if (optValue != null) {
49+
threadCount = Math.max(Integer.parseInt(optValue), 1);
50+
}
51+
}
52+
53+
/**
54+
* set thread pool queue size by option value, if the value less than 1,
55+
* use DEFAULT_QUEUE_SIZE instead.
56+
*
57+
* @param optValue option value
58+
*/
59+
protected void setThreadPoolQueueSize(String optValue) {
60+
if (optValue != null) {
61+
int size = Integer.parseInt(optValue);
62+
threadPoolQueueSize = size < 1 ? DEFAULT_QUEUE_SIZE : size;
63+
}
64+
}
65+
66+
@VisibleForTesting
67+
protected int getThreadCount() {
68+
return this.threadCount;
69+
}
70+
71+
@VisibleForTesting
72+
protected int getThreadPoolQueueSize() {
73+
return this.threadPoolQueueSize;
74+
}
75+
76+
@VisibleForTesting
77+
protected ThreadPoolExecutor getExecutor() {
78+
return this.executor;
79+
}
80+
81+
@Override
82+
protected void processArguments(LinkedList<PathData> args)
83+
throws IOException {
84+
85+
if (isMultiThreadNecessary(args)) {
86+
initThreadPoolExecutor();
87+
}
88+
89+
super.processArguments(args);
90+
91+
if (executor != null) {
92+
waitForCompletion();
93+
}
94+
}
95+
96+
// if thread count is 1 or the source is only one single file,
97+
// don't init executor to avoid threading overhead.
98+
@VisibleForTesting
99+
protected boolean isMultiThreadNecessary(LinkedList<PathData> args)
100+
throws IOException {
101+
return this.threadCount > 1 && hasMoreThanOneSourcePaths(args);
102+
}
103+
104+
// check if source is only one single file.
105+
private boolean hasMoreThanOneSourcePaths(LinkedList<PathData> args)
106+
throws IOException {
107+
if (args.size() > 1) {
108+
return true;
109+
}
110+
if (args.size() == 1) {
111+
PathData src = args.get(0);
112+
if (src.stat == null) {
113+
src.refreshStatus();
114+
}
115+
return isPathRecursable(src);
116+
}
117+
return false;
118+
}
119+
120+
private void initThreadPoolExecutor() {
121+
executor =
122+
new ThreadPoolExecutor(threadCount, threadCount, 1, TimeUnit.SECONDS,
123+
new ArrayBlockingQueue<>(threadPoolQueueSize),
124+
new ThreadPoolExecutor.CallerRunsPolicy());
125+
}
126+
127+
private void waitForCompletion() {
128+
if (executor != null) {
129+
executor.shutdown();
130+
try {
131+
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
132+
} catch (InterruptedException e) {
133+
executor.shutdownNow();
134+
displayError(e);
135+
Thread.currentThread().interrupt();
136+
}
137+
}
138+
}
139+
140+
@Override
141+
protected void copyFileToTarget(PathData src, PathData target)
142+
throws IOException {
143+
if (executor == null) {
144+
super.copyFileToTarget(src, target);
145+
} else {
146+
executor.submit(() -> {
147+
try {
148+
super.copyFileToTarget(src, target);
149+
} catch (IOException e) {
150+
displayError(e);
151+
}
152+
});
153+
}
154+
}
155+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java

Lines changed: 29 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,14 @@
2626
import java.util.Iterator;
2727
import java.util.LinkedList;
2828
import java.util.List;
29-
import java.util.concurrent.ThreadPoolExecutor;
30-
import java.util.concurrent.ArrayBlockingQueue;
31-
import java.util.concurrent.TimeUnit;
3229

33-
import org.apache.hadoop.classification.VisibleForTesting;
3430
import org.apache.hadoop.classification.InterfaceAudience;
3531
import org.apache.hadoop.classification.InterfaceStability;
3632
import org.apache.hadoop.fs.FSDataInputStream;
3733
import org.apache.hadoop.fs.FSDataOutputStream;
3834
import org.apache.hadoop.fs.Path;
3935
import org.apache.hadoop.fs.PathIsDirectoryException;
4036
import org.apache.hadoop.io.IOUtils;
41-
import org.slf4j.Logger;
42-
import org.slf4j.LoggerFactory;
4337

4438
/** Various commands for copy files */
4539
@InterfaceAudience.Private
@@ -210,28 +204,37 @@ private void popPreserveOption(List<String> args) {
210204
/**
211205
* Copy local files to a remote filesystem
212206
*/
213-
public static class Get extends CommandWithDestination {
207+
public static class Get extends CopyCommandWithMultiThread {
214208
public static final String NAME = "get";
215209
public static final String USAGE =
216-
"[-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>";
210+
"[-f] [-p] [-crc] [-ignoreCrc] [-t <thread count>]"
211+
+ " [-q <thread pool queue size>] <src> ... <localdst>";
217212
public static final String DESCRIPTION =
218-
"Copy files that match the file pattern <src> " +
219-
"to the local name. <src> is kept. When copying multiple " +
220-
"files, the destination must be a directory. Passing " +
221-
"-f overwrites the destination if it already exists and " +
222-
"-p preserves access and modification times, " +
223-
"ownership and the mode.\n";
213+
"Copy files that match the file pattern <src> to the local name. "
214+
+ "<src> is kept.\nWhen copying multiple files, the destination"
215+
+ " must be a directory.\nFlags:\n"
216+
+ " -p : Preserves timestamps, ownership and the mode.\n"
217+
+ " -f : Overwrites the destination if it already exists.\n"
218+
+ " -crc : write CRC checksums for the files downloaded.\n"
219+
+ " -ignoreCrc : Skip CRC checks on the file(s) downloaded.\n"
220+
+ " -t <thread count> : Number of threads to be used,"
221+
+ " default is 1.\n"
222+
+ " -q <thread pool queue size> : Thread pool queue size to be"
223+
+ " used, default is 1024.\n";
224224

225225
@Override
226-
protected void processOptions(LinkedList<String> args)
227-
throws IOException {
228-
CommandFormat cf = new CommandFormat(
229-
1, Integer.MAX_VALUE, "crc", "ignoreCrc", "p", "f");
226+
protected void processOptions(LinkedList<String> args) throws IOException {
227+
CommandFormat cf =
228+
new CommandFormat(1, Integer.MAX_VALUE, "crc", "ignoreCrc", "p", "f");
229+
cf.addOptionWithValue("t");
230+
cf.addOptionWithValue("q");
230231
cf.parse(args);
231232
setWriteChecksum(cf.getOpt("crc"));
232233
setVerifyChecksum(!cf.getOpt("ignoreCrc"));
233234
setPreserve(cf.getOpt("p"));
234235
setOverwrite(cf.getOpt("f"));
236+
setThreadCount(cf.getOptValue("t"));
237+
setThreadPoolQueueSize(cf.getOptValue("q"));
235238
setRecursive(true);
236239
getLocalDestination(args);
237240
}
@@ -240,21 +243,12 @@ protected void processOptions(LinkedList<String> args)
240243
/**
241244
* Copy local files to a remote filesystem
242245
*/
243-
public static class Put extends CommandWithDestination {
244-
245-
public static final Logger LOG = LoggerFactory.getLogger(Put.class);
246-
247-
private ThreadPoolExecutor executor = null;
248-
private int threadPoolQueueSize = 1024;
249-
private int numThreads = 1;
250-
251-
private static final int MAX_THREADS =
252-
Runtime.getRuntime().availableProcessors() * 2;
246+
public static class Put extends CopyCommandWithMultiThread {
253247

254248
public static final String NAME = "put";
255249
public static final String USAGE =
256-
"[-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] " +
257-
"<localsrc> ... <dst>";
250+
"[-f] [-p] [-l] [-d] [-t <thread count>] [-q <thread pool queue size>]"
251+
+ " <localsrc> ... <dst>";
258252
public static final String DESCRIPTION =
259253
"Copy files from the local file system " +
260254
"into fs. Copying fails if the file already " +
@@ -263,11 +257,11 @@ public static class Put extends CommandWithDestination {
263257
" -p : Preserves timestamps, ownership and the mode.\n" +
264258
" -f : Overwrites the destination if it already exists.\n" +
265259
" -t <thread count> : Number of threads to be used, default is 1.\n" +
266-
" -q <threadPool size> : ThreadPool queue size to be used, " +
260+
" -q <thread pool queue size> : Thread pool queue size to be used, " +
267261
"default is 1024.\n" +
268-
" -l : Allow DataNode to lazily persist the file to disk. Forces" +
269-
" replication factor of 1. This flag will result in reduced" +
270-
" durability. Use with care.\n" +
262+
" -l : Allow DataNode to lazily persist the file to disk. Forces " +
263+
"replication factor of 1. This flag will result in reduced " +
264+
"durability. Use with care.\n" +
271265
" -d : Skip creation of temporary file(<dst>._COPYING_).\n";
272266

273267
@Override
@@ -277,7 +271,7 @@ protected void processOptions(LinkedList<String> args) throws IOException {
277271
cf.addOptionWithValue("t");
278272
cf.addOptionWithValue("q");
279273
cf.parse(args);
280-
setNumberThreads(cf.getOptValue("t"));
274+
setThreadCount(cf.getOptValue("t"));
281275
setThreadPoolQueueSize(cf.getOptValue("q"));
282276
setOverwrite(cf.getOpt("f"));
283277
setPreserve(cf.getOpt("p"));
@@ -308,92 +302,9 @@ protected void processArguments(LinkedList<PathData> args)
308302
copyStreamToTarget(System.in, getTargetPath(args.get(0)));
309303
return;
310304
}
311-
312-
executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
313-
TimeUnit.SECONDS, new ArrayBlockingQueue<>(threadPoolQueueSize),
314-
new ThreadPoolExecutor.CallerRunsPolicy());
315305
super.processArguments(args);
316-
317-
// issue the command and then wait for it to finish
318-
executor.shutdown();
319-
try {
320-
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
321-
} catch (InterruptedException e) {
322-
executor.shutdownNow();
323-
displayError(e);
324-
Thread.currentThread().interrupt();
325-
}
326-
}
327-
328-
private void setNumberThreads(String numberThreadsString) {
329-
if (numberThreadsString == null) {
330-
numThreads = 1;
331-
} else {
332-
int parsedValue = Integer.parseInt(numberThreadsString);
333-
if (parsedValue <= 1) {
334-
numThreads = 1;
335-
} else if (parsedValue > MAX_THREADS) {
336-
numThreads = MAX_THREADS;
337-
} else {
338-
numThreads = parsedValue;
339-
}
340-
}
341-
}
342-
343-
private void setThreadPoolQueueSize(String numThreadPoolQueueSize) {
344-
if (numThreadPoolQueueSize != null) {
345-
int parsedValue = Integer.parseInt(numThreadPoolQueueSize);
346-
if (parsedValue < 1) {
347-
LOG.warn("The value of the thread pool queue size cannot be " +
348-
"less than 1, and the default value is used here. " +
349-
"The default size is 1024.");
350-
threadPoolQueueSize = 1024;
351-
} else {
352-
threadPoolQueueSize = parsedValue;
353-
}
354-
}
355-
}
356-
357-
@VisibleForTesting
358-
protected int getThreadPoolQueueSize() {
359-
return threadPoolQueueSize;
360-
}
361-
362-
private void copyFile(PathData src, PathData target) throws IOException {
363-
if (isPathRecursable(src)) {
364-
throw new PathIsDirectoryException(src.toString());
365-
}
366-
super.copyFileToTarget(src, target);
367-
}
368-
369-
@Override
370-
protected void copyFileToTarget(PathData src, PathData target)
371-
throws IOException {
372-
// if number of thread is 1, mimic put and avoid threading overhead
373-
if (numThreads == 1) {
374-
copyFile(src, target);
375-
return;
376-
}
377-
378-
Runnable task = () -> {
379-
try {
380-
copyFile(src, target);
381-
} catch (IOException e) {
382-
displayError(e);
383-
}
384-
};
385-
executor.submit(task);
386306
}
387307

388-
@VisibleForTesting
389-
public int getNumThreads() {
390-
return numThreads;
391-
}
392-
393-
@VisibleForTesting
394-
public ThreadPoolExecutor getExecutor() {
395-
return executor;
396-
}
397308
}
398309

399310
public static class CopyFromLocal extends Put {

0 commit comments

Comments
 (0)