From 7c663043b2c1e207dd8c05e09e17811d68badfac Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Fri, 27 Aug 2021 11:41:44 +0800 Subject: [PATCH] HDFS-16173.Improve CopyCommands#Put#executor queue configurability. (#3302) Co-authored-by: zhujianghua Reviewed-by: Hui Fei Reviewed-by: Viraj Jasani (cherry picked from commit 4c94831364e9258247029c22a222a665771ab4c0) --- .../apache/hadoop/fs/shell/CopyCommands.java | 34 +++++++++++++++++-- .../src/site/markdown/FileSystemShell.md | 4 ++- .../hadoop/fs/shell/TestCopyPreserveFlag.java | 16 +++++++++ .../src/test/resources/testConf.xml | 18 ++++++---- 4 files changed, 62 insertions(+), 10 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index 77f63170593ab..06809ec68d663 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -38,6 +38,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIsDirectoryException; import org.apache.hadoop.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Various commands for copy files */ @InterfaceAudience.Private @@ -239,7 +241,11 @@ protected void processOptions(LinkedList args) * Copy local files to a remote filesystem */ public static class Put extends CommandWithDestination { + + public static final Logger LOG = LoggerFactory.getLogger(Put.class); + private ThreadPoolExecutor executor = null; + private int threadPoolQueueSize = 1024; private int numThreads = 1; private static final int MAX_THREADS = @@ -247,7 +253,8 @@ public static class Put extends CommandWithDestination { public static final String NAME = "put"; public static final String USAGE = - "[-f] [-p] [-l] [-d] [-t ] ... "; + "[-f] [-p] [-l] [-d] [-t ] [-q ] " + + " ... "; public static final String DESCRIPTION = "Copy files from the local file system " + "into fs. Copying fails if the file already " + @@ -256,6 +263,8 @@ public static class Put extends CommandWithDestination { " -p : Preserves timestamps, ownership and the mode.\n" + " -f : Overwrites the destination if it already exists.\n" + " -t : Number of threads to be used, default is 1.\n" + + " -q : ThreadPool queue size to be used, " + + "default is 1024.\n" + " -l : Allow DataNode to lazily persist the file to disk. Forces" + " replication factor of 1. This flag will result in reduced" + " durability. Use with care.\n" + @@ -266,8 +275,10 @@ protected void processOptions(LinkedList args) throws IOException { CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d"); cf.addOptionWithValue("t"); + cf.addOptionWithValue("q"); cf.parse(args); setNumberThreads(cf.getOptValue("t")); + setThreadPoolQueueSize(cf.getOptValue("q")); setOverwrite(cf.getOpt("f")); setPreserve(cf.getOpt("p")); setLazyPersist(cf.getOpt("l")); @@ -299,7 +310,7 @@ protected void processArguments(LinkedList args) } executor = new ThreadPoolExecutor(numThreads, numThreads, 1, - TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), + TimeUnit.SECONDS, new ArrayBlockingQueue<>(threadPoolQueueSize), new ThreadPoolExecutor.CallerRunsPolicy()); super.processArguments(args); @@ -329,6 +340,25 @@ private void setNumberThreads(String numberThreadsString) { } } + private void setThreadPoolQueueSize(String numThreadPoolQueueSize) { + if (numThreadPoolQueueSize != null) { + int parsedValue = Integer.parseInt(numThreadPoolQueueSize); + if (parsedValue < 1) { + LOG.warn("The value of the thread pool queue size cannot be " + + "less than 1, and the default value is used here. " + + "The default size is 1024."); + threadPoolQueueSize = 1024; + } else { + threadPoolQueueSize = parsedValue; + } + } + } + + @VisibleForTesting + protected int getThreadPoolQueueSize() { + return threadPoolQueueSize; + } + private void copyFile(PathData src, PathData target) throws IOException { if (isPathRecursable(src)) { throw new PathIsDirectoryException(src.toString()); diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md index 33a6bf12df8ea..feed0a49053fc 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md @@ -525,7 +525,7 @@ Returns 0 on success and -1 on error. put --- -Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t ] [ - | .. ]. ` +Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t ] [-q ] [ - | .. ]. ` Copy single src, or multiple srcs from local file system to the destination file system. Also reads input from stdin and writes to destination file system if the source is set to "-" @@ -542,6 +542,7 @@ Options: * `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication factor of 1. This flag will result in reduced durability. Use with care. * `-d` : Skip creation of temporary file with the suffix `._COPYING_`. +* `-q ` : ThreadPool queue size to be used, default is 1024. Examples: @@ -550,6 +551,7 @@ Examples: * `hadoop fs -put -f localfile1 localfile2 /user/hadoop/hadoopdir` * `hadoop fs -put -d localfile hdfs://nn.example.com/hadoop/hadoopfile` * `hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfile` Reads the input from stdin. +* `hadoop fs -put -q 500 localfile3 hdfs://nn.example.com/hadoop/hadoopfile3` Exit Code: diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java index 8d2e1608723d1..0f0ddcc4ee498 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java @@ -122,6 +122,22 @@ public void testPutWithoutP() throws Exception { assertAttributesChanged(TO); } + @Test(timeout = 10000) + public void testPutWithPQ() throws Exception { + Put put = new Put(); + run(put, "-p", "-q", "100", FROM.toString(), TO.toString()); + assertEquals(put.getThreadPoolQueueSize(), 100); + assertAttributesPreserved(TO); + } + + @Test(timeout = 10000) + public void testPutWithQ() throws Exception { + Put put = new Put(); + run(put, "-q", "100", FROM.toString(), TO.toString()); + assertEquals(put.getThreadPoolQueueSize(), 100); + assertAttributesChanged(TO); + } + @Test(timeout = 10000) public void testPutWithSplCharacter() throws Exception { fs.mkdirs(DIR_FROM_SPL); diff --git a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml index edf598b68df4f..fd6ee110c722f 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml @@ -498,7 +498,7 @@ RegexpComparator RegexpComparator - ^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] <localsrc> \.\.\. <dst> :\s* + ^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] \[-q <threadPool queue size>\] <localsrc> \.\.\. <dst> :\s* @@ -515,19 +515,23 @@ RegexpComparator - ^\s*-p Preserves timestamps, ownership and the mode.( )* + ^\s*-p Preserves timestamps, ownership and the mode.( )* RegexpComparator - ^\s*-f Overwrites the destination if it already exists.( )* + ^\s*-f Overwrites the destination if it already exists.( )* RegexpComparator - ^\s*-t <thread count> Number of threads to be used, default is 1.( )* + ^\s*-t <thread count> Number of threads to be used, default is 1.( )* RegexpComparator - ^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )* + ^\s*-q <threadPool size> ThreadPool queue size to be used, default is 1024.( )* + + + RegexpComparator + ^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )* RegexpComparator @@ -539,7 +543,7 @@ RegexpComparator - ^\s*-d Skip creation of temporary file\(<dst>\._COPYING_\).( )* + ^\s*-d Skip creation of temporary file\(<dst>\._COPYING_\).( )* @@ -554,7 +558,7 @@ RegexpComparator - ^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] <localsrc> \.\.\. <dst> :\s* + ^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] \[-q <threadPool queue size>\] <localsrc> \.\.\. <dst> :\s* RegexpComparator