Skip to content

MAPREDUCE-7315. LocatedFileStatusFetcher to collect/publish IOStatistics. #2579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
Expand All @@ -37,6 +38,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
Expand All @@ -52,6 +56,9 @@

import org.apache.hadoop.util.concurrent.HadoopExecutors;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;

/**
* Utility class to fetch block locations for specified Input paths using a
* configured number of threads.
Expand All @@ -60,7 +67,7 @@
* configuration.
*/
@Private
public class LocatedFileStatusFetcher {
public class LocatedFileStatusFetcher implements IOStatisticsSource {

public static final Logger LOG =
LoggerFactory.getLogger(LocatedFileStatusFetcher.class.getName());
Expand All @@ -87,6 +94,12 @@ public class LocatedFileStatusFetcher {

private volatile Throwable unknownError;

/**
* Demand created IO Statistics: only if the filesystem
* returns statistics does this fetch collect them.
*/
private IOStatisticsSnapshot iostats;

/**
* Instantiate.
* The newApi switch is only used to configure what exception is raised
Expand Down Expand Up @@ -226,7 +239,46 @@ private void decrementRunningAndCheckCompletion() {
lock.unlock();
}
}


/**
* Return any IOStatistics collected during listing.
* @return IO stats accrued.
*/
@Override
public synchronized IOStatistics getIOStatistics() {
return iostats;
}

/**
* Add the statistics of an individual thread's scan.
* @param stats possibly null statistics.
*/
private void addResultStatistics(IOStatistics stats) {
if (stats != null) {
// demand creation of IO statistics.
synchronized (this) {
LOG.debug("Adding IOStatistics: {}", stats);
if (iostats == null) {
// demand create the statistics
iostats = snapshotIOStatistics(stats);
} else {
iostats.aggregate(stats);
}
}
}
}

@Override
public String toString() {
final IOStatistics ioStatistics = getIOStatistics();
StringJoiner stringJoiner = new StringJoiner(", ",
LocatedFileStatusFetcher.class.getSimpleName() + "[", "]");
if (ioStatistics != null) {
stringJoiner.add("IOStatistics=" + ioStatistics);
}
return stringJoiner.toString();
}

/**
* Retrieves block locations for the given @link {@link FileStatus}, and adds
* additional paths to the process queue if required.
Expand Down Expand Up @@ -266,6 +318,8 @@ public Result call() throws Exception {
}
}
}
// aggregate any stats
result.stats = retrieveIOStatistics(iter);
} else {
result.locatedFileStatuses.add(fileStatus);
}
Expand All @@ -276,6 +330,7 @@ private static class Result {
private List<FileStatus> locatedFileStatuses = new LinkedList<>();
private List<FileStatus> dirsNeedingRecursiveCalls = new LinkedList<>();
private FileSystem fs;
private IOStatistics stats;
}
}

Expand All @@ -290,6 +345,7 @@ private class ProcessInputDirCallback implements
@Override
public void onSuccess(ProcessInputDirCallable.Result result) {
try {
addResultStatistics(result.stats);
if (!result.locatedFileStatuses.isEmpty()) {
resultQueue.add(result.locatedFileStatuses);
}
Expand Down