Skip to content

Commit

Permalink
HDFS-16173.Improve CopyCommands#Put#executor queue configurability. (a…
Browse files Browse the repository at this point in the history
…pache#3302)

Co-authored-by: zhujianghua <zhujianghua@zhujianghuadeMacBook-Pro.local>
Reviewed-by: Hui Fei <ferhui@apache.org>
Reviewed-by: Viraj Jasani <vjasani@apache.org>
  • Loading branch information
2 people authored and Kiran Kumar Maturi committed Nov 24, 2021
1 parent ea2cf8c commit a9448b2
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Various commands for copy files */
@InterfaceAudience.Private
Expand Down Expand Up @@ -239,15 +241,20 @@ protected void processOptions(LinkedList<String> args)
* Copy local files to a remote filesystem
*/
public static class Put extends CommandWithDestination {

public static final Logger LOG = LoggerFactory.getLogger(Put.class);

private ThreadPoolExecutor executor = null;
private int threadPoolQueueSize = 1024;
private int numThreads = 1;

private static final int MAX_THREADS =
Runtime.getRuntime().availableProcessors() * 2;

public static final String NAME = "put";
public static final String USAGE =
"[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>";
"[-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] " +
"<localsrc> ... <dst>";
public static final String DESCRIPTION =
"Copy files from the local file system " +
"into fs. Copying fails if the file already " +
Expand All @@ -256,6 +263,8 @@ public static class Put extends CommandWithDestination {
" -p : Preserves timestamps, ownership and the mode.\n" +
" -f : Overwrites the destination if it already exists.\n" +
" -t <thread count> : Number of threads to be used, default is 1.\n" +
" -q <threadPool size> : ThreadPool queue size to be used, " +
"default is 1024.\n" +
" -l : Allow DataNode to lazily persist the file to disk. Forces" +
" replication factor of 1. This flag will result in reduced" +
" durability. Use with care.\n" +
Expand All @@ -266,8 +275,10 @@ protected void processOptions(LinkedList<String> args) throws IOException {
CommandFormat cf =
new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
cf.addOptionWithValue("t");
cf.addOptionWithValue("q");
cf.parse(args);
setNumberThreads(cf.getOptValue("t"));
setThreadPoolQueueSize(cf.getOptValue("q"));
setOverwrite(cf.getOpt("f"));
setPreserve(cf.getOpt("p"));
setLazyPersist(cf.getOpt("l"));
Expand Down Expand Up @@ -299,7 +310,7 @@ protected void processArguments(LinkedList<PathData> args)
}

executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
TimeUnit.SECONDS, new ArrayBlockingQueue<>(threadPoolQueueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
super.processArguments(args);

Expand Down Expand Up @@ -329,6 +340,25 @@ private void setNumberThreads(String numberThreadsString) {
}
}

private void setThreadPoolQueueSize(String numThreadPoolQueueSize) {
if (numThreadPoolQueueSize != null) {
int parsedValue = Integer.parseInt(numThreadPoolQueueSize);
if (parsedValue < 1) {
LOG.warn("The value of the thread pool queue size cannot be " +
"less than 1, and the default value is used here. " +
"The default size is 1024.");
threadPoolQueueSize = 1024;
} else {
threadPoolQueueSize = parsedValue;
}
}
}

@VisibleForTesting
protected int getThreadPoolQueueSize() {
return threadPoolQueueSize;
}

private void copyFile(PathData src, PathData target) throws IOException {
if (isPathRecursable(src)) {
throw new PathIsDirectoryException(src.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ Returns 0 on success and -1 on error.
put
---

Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t <thread count>] [ - | <localsrc1> .. ]. <dst>`
Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] [ - | <localsrc1> .. ]. <dst>`

Copy single src, or multiple srcs from local file system to the destination file system.
Also reads input from stdin and writes to destination file system if the source is set to "-"
Expand All @@ -542,6 +542,7 @@ Options:
* `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication
factor of 1. This flag will result in reduced durability. Use with care.
* `-d` : Skip creation of temporary file with the suffix `._COPYING_`.
* `-q <threadPool queue size>` : ThreadPool queue size to be used, default is 1024.


Examples:
Expand All @@ -550,6 +551,7 @@ Examples:
* `hadoop fs -put -f localfile1 localfile2 /user/hadoop/hadoopdir`
* `hadoop fs -put -d localfile hdfs://nn.example.com/hadoop/hadoopfile`
* `hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfile` Reads the input from stdin.
* `hadoop fs -put -q 500 localfile3 hdfs://nn.example.com/hadoop/hadoopfile3`

Exit Code:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ public void testPutWithoutP() throws Exception {
assertAttributesChanged(TO);
}

@Test(timeout = 10000)
public void testPutWithPQ() throws Exception {
Put put = new Put();
run(put, "-p", "-q", "100", FROM.toString(), TO.toString());
assertEquals(put.getThreadPoolQueueSize(), 100);
assertAttributesPreserved(TO);
}

@Test(timeout = 10000)
public void testPutWithQ() throws Exception {
Put put = new Put();
run(put, "-q", "100", FROM.toString(), TO.toString());
assertEquals(put.getThreadPoolQueueSize(), 100);
assertAttributesChanged(TO);
}

@Test(timeout = 10000)
public void testPutWithSplCharacter() throws Exception {
fs.mkdirs(DIR_FROM_SPL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@
<type>RegexpComparator</type>
<comparator>
<type>RegexpComparator</type>
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] \[-q &lt;threadPool queue size&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
</comparator>
</comparator>
<comparator>
Expand All @@ -515,19 +515,23 @@
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*-p Preserves timestamps, ownership and the mode.( )*</expected-output>
<expected-output>^\s*-p Preserves timestamps, ownership and the mode.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
<expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*-t &lt;thread count&gt; Number of threads to be used, default is 1.( )*</expected-output>
<expected-output>^\s*-t &lt;thread count&gt; Number of threads to be used, default is 1.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
<expected-output>^\s*-q &lt;threadPool size&gt; ThreadPool queue size to be used, default is 1024.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
Expand All @@ -539,7 +543,7 @@
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*-d Skip creation of temporary file\(&lt;dst&gt;\._COPYING_\).( )*</expected-output>
<expected-output>^\s*-d Skip creation of temporary file\(&lt;dst&gt;\._COPYING_\).( )*</expected-output>
</comparator>
</comparators>
</test>
Expand All @@ -554,7 +558,7 @@
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] \[-q &lt;threadPool queue size&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
Expand Down

0 comments on commit a9448b2

Please sign in to comment.