Skip to content

HADOOP-17998. Allow get command to run with multi threads. #3645

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,11 @@ private boolean checkPathsForReservedRaw(Path src, Path target)

/**
* If direct write is disabled ,copies the stream contents to a temporary
* file "<target>._COPYING_". If the copy is
* successful, the temporary file will be renamed to the real path,
* else the temporary file will be deleted.
* file "target._COPYING_". If the copy is successful, the temporary file
* will be renamed to the real path, else the temporary file will be deleted.
* if direct write is enabled , then creation temporary file is skipped.
* @param in the input stream for the copy
*
* @param in the input stream for the copy
* @param target where to store the contents of the stream
* @throws IOException if copy fails
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.shell;

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.VisibleForTesting;

/**
* Abstract command to enable sub copy commands run with multi-thread.
*/
public abstract class CopyCommandWithMultiThread
extends CommandWithDestination {

private int threadCount = 1;
private ThreadPoolExecutor executor = null;
private int threadPoolQueueSize = DEFAULT_QUEUE_SIZE;

public static final int DEFAULT_QUEUE_SIZE = 1024;

/**
* set thread count by option value, if the value less than 1,
* use 1 instead.
*
* @param optValue option value
*/
protected void setThreadCount(String optValue) {
if (optValue != null) {
threadCount = Math.max(Integer.parseInt(optValue), 1);
}
}

/**
* set thread pool queue size by option value, if the value less than 1,
* use DEFAULT_QUEUE_SIZE instead.
*
* @param optValue option value
*/
protected void setThreadPoolQueueSize(String optValue) {
if (optValue != null) {
int size = Integer.parseInt(optValue);
threadPoolQueueSize = size < 1 ? DEFAULT_QUEUE_SIZE : size;
}
}

@VisibleForTesting
protected int getThreadCount() {
return this.threadCount;
}

@VisibleForTesting
protected int getThreadPoolQueueSize() {
return this.threadPoolQueueSize;
}

@VisibleForTesting
protected ThreadPoolExecutor getExecutor() {
return this.executor;
}

@Override
protected void processArguments(LinkedList<PathData> args)
throws IOException {

if (isMultiThreadNecessary(args)) {
initThreadPoolExecutor();
}

super.processArguments(args);

if (executor != null) {
waitForCompletion();
}
}

// if thread count is 1 or the source is only one single file,
// don't init executor to avoid threading overhead.
@VisibleForTesting
protected boolean isMultiThreadNecessary(LinkedList<PathData> args)
throws IOException {
return this.threadCount > 1 && hasMoreThanOneSourcePaths(args);
}

// check if source is only one single file.
private boolean hasMoreThanOneSourcePaths(LinkedList<PathData> args)
throws IOException {
if (args.size() > 1) {
return true;
}
if (args.size() == 1) {
PathData src = args.get(0);
if (src.stat == null) {
src.refreshStatus();
}
return isPathRecursable(src);
}
return false;
}

private void initThreadPoolExecutor() {
executor =
new ThreadPoolExecutor(threadCount, threadCount, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(threadPoolQueueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
}

private void waitForCompletion() {
if (executor != null) {
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException e) {
executor.shutdownNow();
displayError(e);
Thread.currentThread().interrupt();
}
}
}

@Override
protected void copyFileToTarget(PathData src, PathData target)
throws IOException {
if (executor == null) {
super.copyFileToTarget(src, target);
} else {
executor.submit(() -> {
try {
super.copyFileToTarget(src, target);
} catch (IOException e) {
displayError(e);
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,14 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Various commands for copy files */
@InterfaceAudience.Private
Expand Down Expand Up @@ -210,28 +204,37 @@ private void popPreserveOption(List<String> args) {
/**
* Copy local files to a remote filesystem
*/
public static class Get extends CommandWithDestination {
public static class Get extends CopyCommandWithMultiThread {
public static final String NAME = "get";
public static final String USAGE =
"[-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>";
"[-f] [-p] [-crc] [-ignoreCrc] [-t <thread count>]"
+ " [-q <thread pool queue size>] <src> ... <localdst>";
public static final String DESCRIPTION =
"Copy files that match the file pattern <src> " +
"to the local name. <src> is kept. When copying multiple " +
"files, the destination must be a directory. Passing " +
"-f overwrites the destination if it already exists and " +
"-p preserves access and modification times, " +
"ownership and the mode.\n";
"Copy files that match the file pattern <src> to the local name. "
+ "<src> is kept.\nWhen copying multiple files, the destination"
+ " must be a directory.\nFlags:\n"
+ " -p : Preserves timestamps, ownership and the mode.\n"
+ " -f : Overwrites the destination if it already exists.\n"
+ " -crc : write CRC checksums for the files downloaded.\n"
+ " -ignoreCrc : Skip CRC checks on the file(s) downloaded.\n"
+ " -t <thread count> : Number of threads to be used,"
+ " default is 1.\n"
+ " -q <thread pool queue size> : Thread pool queue size to be"
+ " used, default is 1024.\n";

@Override
protected void processOptions(LinkedList<String> args)
throws IOException {
CommandFormat cf = new CommandFormat(
1, Integer.MAX_VALUE, "crc", "ignoreCrc", "p", "f");
protected void processOptions(LinkedList<String> args) throws IOException {
CommandFormat cf =
new CommandFormat(1, Integer.MAX_VALUE, "crc", "ignoreCrc", "p", "f");
cf.addOptionWithValue("t");
cf.addOptionWithValue("q");
cf.parse(args);
setWriteChecksum(cf.getOpt("crc"));
setVerifyChecksum(!cf.getOpt("ignoreCrc"));
setPreserve(cf.getOpt("p"));
setOverwrite(cf.getOpt("f"));
setThreadCount(cf.getOptValue("t"));
setThreadPoolQueueSize(cf.getOptValue("q"));
setRecursive(true);
getLocalDestination(args);
}
Expand All @@ -240,21 +243,12 @@ protected void processOptions(LinkedList<String> args)
/**
* Copy local files to a remote filesystem
*/
public static class Put extends CommandWithDestination {

public static final Logger LOG = LoggerFactory.getLogger(Put.class);

private ThreadPoolExecutor executor = null;
private int threadPoolQueueSize = 1024;
private int numThreads = 1;

private static final int MAX_THREADS =
Runtime.getRuntime().availableProcessors() * 2;
public static class Put extends CopyCommandWithMultiThread {

public static final String NAME = "put";
public static final String USAGE =
"[-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] " +
"<localsrc> ... <dst>";
"[-f] [-p] [-l] [-d] [-t <thread count>] [-q <thread pool queue size>]"
+ " <localsrc> ... <dst>";
public static final String DESCRIPTION =
"Copy files from the local file system " +
"into fs. Copying fails if the file already " +
Expand All @@ -263,11 +257,11 @@ public static class Put extends CommandWithDestination {
" -p : Preserves timestamps, ownership and the mode.\n" +
" -f : Overwrites the destination if it already exists.\n" +
" -t <thread count> : Number of threads to be used, default is 1.\n" +
" -q <threadPool size> : ThreadPool queue size to be used, " +
" -q <thread pool queue size> : Thread pool queue size to be used, " +
"default is 1024.\n" +
" -l : Allow DataNode to lazily persist the file to disk. Forces" +
" replication factor of 1. This flag will result in reduced" +
" durability. Use with care.\n" +
" -l : Allow DataNode to lazily persist the file to disk. Forces " +
"replication factor of 1. This flag will result in reduced " +
"durability. Use with care.\n" +
" -d : Skip creation of temporary file(<dst>._COPYING_).\n";

@Override
Expand All @@ -277,7 +271,7 @@ protected void processOptions(LinkedList<String> args) throws IOException {
cf.addOptionWithValue("t");
cf.addOptionWithValue("q");
cf.parse(args);
setNumberThreads(cf.getOptValue("t"));
setThreadCount(cf.getOptValue("t"));
setThreadPoolQueueSize(cf.getOptValue("q"));
setOverwrite(cf.getOpt("f"));
setPreserve(cf.getOpt("p"));
Expand Down Expand Up @@ -308,92 +302,9 @@ protected void processArguments(LinkedList<PathData> args)
copyStreamToTarget(System.in, getTargetPath(args.get(0)));
return;
}

executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(threadPoolQueueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
super.processArguments(args);

// issue the command and then wait for it to finish
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException e) {
executor.shutdownNow();
displayError(e);
Thread.currentThread().interrupt();
}
}

private void setNumberThreads(String numberThreadsString) {
if (numberThreadsString == null) {
numThreads = 1;
} else {
int parsedValue = Integer.parseInt(numberThreadsString);
if (parsedValue <= 1) {
numThreads = 1;
} else if (parsedValue > MAX_THREADS) {
numThreads = MAX_THREADS;
} else {
numThreads = parsedValue;
}
}
}

private void setThreadPoolQueueSize(String numThreadPoolQueueSize) {
if (numThreadPoolQueueSize != null) {
int parsedValue = Integer.parseInt(numThreadPoolQueueSize);
if (parsedValue < 1) {
LOG.warn("The value of the thread pool queue size cannot be " +
"less than 1, and the default value is used here. " +
"The default size is 1024.");
threadPoolQueueSize = 1024;
} else {
threadPoolQueueSize = parsedValue;
}
}
}

@VisibleForTesting
protected int getThreadPoolQueueSize() {
return threadPoolQueueSize;
}

private void copyFile(PathData src, PathData target) throws IOException {
if (isPathRecursable(src)) {
throw new PathIsDirectoryException(src.toString());
}
super.copyFileToTarget(src, target);
}

@Override
protected void copyFileToTarget(PathData src, PathData target)
throws IOException {
// if number of thread is 1, mimic put and avoid threading overhead
if (numThreads == 1) {
copyFile(src, target);
return;
}

Runnable task = () -> {
try {
copyFile(src, target);
} catch (IOException e) {
displayError(e);
}
};
executor.submit(task);
}

@VisibleForTesting
public int getNumThreads() {
return numThreads;
}

@VisibleForTesting
public ThreadPoolExecutor getExecutor() {
return executor;
}
}

public static class CopyFromLocal extends Put {
Expand Down
Loading