Skip to content

HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. #2732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util.functional;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.util.DurationInfo;

import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause;

/**
* A bridge from Callable to Supplier; catching exceptions
* raised by the callable and wrapping them as appropriate.
* @param <T> return type.
*/
public final class CommonCallableSupplier<T> implements Supplier {

private static final Logger LOG =
LoggerFactory.getLogger(CommonCallableSupplier.class);

private final Callable<T> call;

/**
* Create.
* @param call call to invoke.
*/
public CommonCallableSupplier(final Callable<T> call) {
this.call = call;
}

@Override
public Object get() {
try {
return call.call();
} catch (RuntimeException e) {
throw e;
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (Exception e) {
throw new UncheckedIOException(new IOException(e));
}
}

/**
* Submit a callable into a completable future.
* RTEs are rethrown.
* Non RTEs are caught and wrapped; IOExceptions to
* {@code RuntimeIOException} instances.
* @param executor executor.
* @param call call to invoke
* @param <T> type
* @return the future to wait for
*/
@SuppressWarnings("unchecked")
public static <T> CompletableFuture<T> submit(final Executor executor,
final Callable<T> call) {
return CompletableFuture
.supplyAsync(new CommonCallableSupplier<T>(call), executor);
}

/**
* Wait for a list of futures to complete. If the list is empty,
* return immediately.
* @param futures list of futures.
* @throws IOException if one of the called futures raised an IOE.
* @throws RuntimeException if one of the futures raised one.
*/
public static <T> void waitForCompletion(
final List<CompletableFuture<T>> futures) throws IOException {
if (futures.isEmpty()) {
return;
}
// await completion
waitForCompletion(
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])));
}

/**
* Wait for a single of future to complete, extracting IOEs afterwards.
* @param future future to wait for.
* @throws IOException if one of the called futures raised an IOE.
* @throws RuntimeException if one of the futures raised one.
*/
public static <T> void waitForCompletion(final CompletableFuture<T> future)
throws IOException {
try (DurationInfo ignore = new DurationInfo(LOG, false,
"Waiting for task completion")) {
future.join();
} catch (CancellationException e) {
throw new IOException(e);
} catch (CompletionException e) {
raiseInnerCause(e);
}
}

/**
* Wait for a single of future to complete, ignoring exceptions raised.
* @param future future to wait for.
*/
public static <T> void waitForCompletionIgnoringExceptions(
@Nullable final CompletableFuture<T> future) {
if (future != null) {
try (DurationInfo ignore = new DurationInfo(LOG, false,
"Waiting for task completion")) {
future.join();
} catch (Exception e) {
LOG.debug("Ignoring exception raised in task completion: ");
}
}
}

/**
* Block awaiting completion for any non-null future passed in;
* No-op if a null arg was supplied.
* @param future future
* @throws IOException if one of the called futures raised an IOE.
* @throws RuntimeException if one of the futures raised one.
*/
public static void maybeAwaitCompletion(
@Nullable final CompletableFuture<Void> future) throws IOException {
if (future != null) {
waitForCompletion(future);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
Expand All @@ -46,8 +50,11 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Appender;
Expand All @@ -61,15 +68,28 @@
import org.junit.Assume;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;

import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.util.functional.CommonCallableSupplier.submit;
import static org.apache.hadoop.util.functional.CommonCallableSupplier.waitForCompletion;

/**
* Test provides some very generic helpers which might be used across the tests
*/
public abstract class GenericTestUtils {

public static final int EXECUTOR_THREAD_COUNT = 64;

private static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(GenericTestUtils.class);

public static final String PREFIX = "file-";

private static final AtomicInteger sequence = new AtomicInteger();

/**
Expand Down Expand Up @@ -896,5 +916,132 @@ public static int getTestsThreadCount() {
}
return threadCount;
}
/**
* Write the text to a file asynchronously. Logs the operation duration.
* @param fs filesystem
* @param path path
* @return future to the patch created.
*/
private static CompletableFuture<Path> put(FileSystem fs,
Path path, String text) {
return submit(EXECUTOR, () -> {
try (DurationInfo ignore =
new DurationInfo(LOG, false, "Creating %s", path)) {
createFile(fs, path, true, text.getBytes(Charsets.UTF_8));
return path;
}
});
}

/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @return the list of files created.
*/
public static List<Path> createFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount) throws IOException {
return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
new ArrayList<Path>(fileCount),
new ArrayList<Path>(dirCount));
}

/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @param paths [out] list of file paths created
* @param dirs [out] list of directory paths created.
* @return the list of files created.
*/
public static List<Path> createDirsAndFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount,
final List<Path> paths,
final List<Path> dirs) throws IOException {
buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
+ dirs.size());

// create directories. With dir marker retention, that adds more entries
// to cause deletion issues
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
for (Path path : dirs) {
futures.add(submit(EXECUTOR, () ->{
fs.mkdirs(path);
return path;
}));
}
waitForCompletion(futures);
}

try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d files", paths.size())) {
for (Path path : paths) {
futures.add(put(fs, path, path.getName()));
}
waitForCompletion(futures);
return paths;
}
}

}
/**
* Recursive method to build up lists of files and directories.
* @param filePaths list of file paths to add entries to.
* @param dirPaths list of directory paths to add entries to.
* @param destDir destination directory.
* @param depth depth of directories
* @param fileCount number of files.
* @param dirCount number of directories.
*/
public static void buildPaths(final List<Path> filePaths,
final List<Path> dirPaths, final Path destDir, final int depth,
final int fileCount, final int dirCount) {
if (depth <= 0) {
return;
}
// create the file paths
for (int i = 0; i < fileCount; i++) {
String name = filenameOfIndex(i);
Path p = new Path(destDir, name);
filePaths.add(p);
}
for (int i = 0; i < dirCount; i++) {
String name = String.format("dir-%03d", i);
Path p = new Path(destDir, name);
dirPaths.add(p);
buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount);
}
}

/**
* Given an index, return a string to use as the filename.
* @param i index
* @return name
*/
public static String filenameOfIndex(final int i) {
return String.format("%s%03d", PREFIX, i);
}

/**
* For submitting work.
*/
private static final BlockingThreadPoolExecutorService EXECUTOR =
BlockingThreadPoolExecutorService.newInstance(
EXECUTOR_THREAD_COUNT,
EXECUTOR_THREAD_COUNT * 2,
30, TimeUnit.SECONDS,
"test-operations");
}
Loading