Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.Stack;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -2034,15 +2033,55 @@ protected DirectoryEntries listStatusBatch(Path f, byte[] token) throws
}

/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f
* given path
* @param recursive
* whether to return recursive paths from directories in the given
* path f. if false, only immediate children (files/directories) to f
* are returned
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException
* when the path does not exist; IOException see specific
* implementation
*/
public FileStatus[] listStatus(Path f, boolean recursive) throws FileNotFoundException,
IOException {
FileStatus[] fileStatuses = listStatus(f);
if (fileStatuses == null || !recursive) {
return fileStatuses;
}
List<FileStatus> result = new ArrayList<FileStatus>();
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
FileStatus[] childStatuses = listStatus(fileStatus.getPath(), true);
if (childStatuses != null) {
for (FileStatus child : childStatuses) {
result.add(child);
}
}
} else {
result.add(fileStatus);
}
}
return result.toArray(new FileStatus[result.size()]);
}

/*
* Filter files/directories in the given path using the user-supplied path
* filter. Results are added to the given array <code>results</code>.
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
private void listStatus(ArrayList<FileStatus> results, Path f,
PathFilter filter) throws FileNotFoundException, IOException {
FileStatus listing[] = listStatus(f);
Preconditions.checkNotNull(listing, "listStatus should not return NULL");
PathFilter filter, boolean recursive) throws FileNotFoundException, IOException {
FileStatus[] listing = listStatus(f, recursive);
if (listing == null) {
throw new IOException("Error accessing " + f);
}

for (int i = 0; i < listing.length; i++) {
if (filter.accept(listing[i].getPath())) {
results.add(listing[i]);
Expand Down Expand Up @@ -2084,8 +2123,13 @@ public RemoteIterator<Path> listCorruptFileBlocks(Path path)
*/
public FileStatus[] listStatus(Path f, PathFilter filter)
throws FileNotFoundException, IOException {
return listStatus(f, filter, false);
}

public FileStatus[] listStatus(Path f, PathFilter filter, boolean recursive)
throws FileNotFoundException, IOException {
ArrayList<FileStatus> results = new ArrayList<>();
listStatus(results, f, filter);
listStatus(results, f, filter, recursive);
return results.toArray(new FileStatus[results.size()]);
}

Expand All @@ -2108,6 +2152,11 @@ public FileStatus[] listStatus(Path[] files)
return listStatus(files, DEFAULT_FILTER);
}

public FileStatus[] listStatus(Path[] files, boolean recursive)
throws FileNotFoundException, IOException {
return listStatus(files, DEFAULT_FILTER, recursive);
}

/**
* Filter files/directories in the given list of paths using user-supplied
* path filter.
Expand All @@ -2126,9 +2175,14 @@ public FileStatus[] listStatus(Path[] files)
*/
public FileStatus[] listStatus(Path[] files, PathFilter filter)
throws FileNotFoundException, IOException {
return listStatus(files, filter, false);
}

public FileStatus[] listStatus(Path[] files, PathFilter filter, boolean recursive)
throws FileNotFoundException, IOException {
ArrayList<FileStatus> results = new ArrayList<FileStatus>();
for (int i = 0; i < files.length; i++) {
listStatus(results, files[i], filter);
listStatus(results, files[i], filter, recursive);
}
return results.toArray(new FileStatus[results.size()]);
}
Expand Down Expand Up @@ -2229,6 +2283,11 @@ public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
return listLocatedStatus(f, DEFAULT_FILTER);
}

public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
boolean recursive) throws FileNotFoundException, IOException {
return listLocatedStatus(f, DEFAULT_FILTER, recursive);
}

/**
* List a directory.
* The returned results include its block location if it is a file
Expand All @@ -2241,10 +2300,15 @@ public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
* @throws IOException if any I/O error occurred
*/
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
final PathFilter filter)
final PathFilter filter) throws FileNotFoundException, IOException {
return listLocatedStatus(f, filter, false);
}

protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
final PathFilter filter, final boolean recursive)
throws FileNotFoundException, IOException {
return new RemoteIterator<LocatedFileStatus>() {
private final FileStatus[] stats = listStatus(f, filter);
private final FileStatus[] stats = listStatus(f, filter, recursive);
private int i = 0;

@Override
Expand Down Expand Up @@ -2350,42 +2414,24 @@ public RemoteIterator<LocatedFileStatus> listFiles(
final Path f, final boolean recursive)
throws FileNotFoundException, IOException {
return new RemoteIterator<LocatedFileStatus>() {
private Stack<RemoteIterator<LocatedFileStatus>> itors = new Stack<>();
private RemoteIterator<LocatedFileStatus> curItor =
listLocatedStatus(f);
private RemoteIterator<LocatedFileStatus> curItor = listLocatedStatus(f, recursive);
private LocatedFileStatus curFile;

@Override
public boolean hasNext() throws IOException {
while (curFile == null) {
if (curItor.hasNext()) {
handleFileStat(curItor.next());
} else if (!itors.empty()) {
curItor = itors.pop();
LocatedFileStatus next = curItor.next();
if (next.isFile()) {
curFile = next;
}
} else {
return false;
}
}
return true;
}

/**
* Process the input stat.
* If it is a file, return the file stat.
* If it is a directory, traverse the directory if recursive is true;
* ignore it if recursive is false.
* @param stat input status
* @throws IOException if any IO error occurs
*/
private void handleFileStat(LocatedFileStatus stat) throws IOException {
if (stat.isFile()) { // file
curFile = stat;
} else if (recursive) { // directory
itors.push(curItor);
curItor = listLocatedStatus(stat.getPath());
}
}

@Override
public LocatedFileStatus next() throws IOException {
if (hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private FileStatus[] doGlob() throws IOException {
* not all Hadoop filesystems have this property. So we sort here in order
* to get consistent results. See HADOOP-10798 for details.
*/
FileStatus ret[] = results.toArray(new FileStatus[0]);
FileStatus[] ret = results.toArray(new FileStatus[results.size()]);
Arrays.sort(ret);
return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,55 @@ public void testGlobStatusFilterWithMultiplePathWildcardsAndNonTrivialFilter()
filteredPaths));
}

@Test
public void testListStatusWithRecursiveFlag() throws Exception {
Path testDir = getTestRootPath(fSys, "test");
Path inDir1 = new Path(testDir, "dir1");
Path inFile1 = new Path(testDir, "file1");

Path dir1File1 = new Path(inDir1, "file1");
Path dir1File2 = new Path(inDir1, "file2");
Path inDir1Dir2 = new Path(inDir1, "dir2");

Path dir1Dir2File1 = new Path(inDir1Dir2, "file1");
Path dir1Dir2File2 = new Path(inDir1Dir2, "file2");
Path inDir1Dir2Dir3 = new Path(inDir1Dir2, "dir3");

fSys.mkdirs(inDir1);
fSys.mkdirs(inDir1Dir2);
fSys.mkdirs(inDir1Dir2Dir3);

fSys.createNewFile(inFile1);
fSys.createNewFile(dir1File1);
fSys.createNewFile(dir1File2);
fSys.createNewFile(dir1Dir2File1);
fSys.createNewFile(dir1Dir2File2);

// test listStatus that returns an array
FileStatus[] paths = fSys.listStatus(testDir, false);
Assert.assertEquals(2, paths.length);
Assert.assertTrue(containsTestRootPath(inFile1, paths));
Assert.assertTrue(containsTestRootPath(inDir1, paths));

paths = fSys.listStatus(inDir1, false);
Assert.assertEquals(3, paths.length);
Assert.assertTrue(containsTestRootPath(dir1File1, paths));
Assert.assertTrue(containsTestRootPath(dir1File2, paths));
Assert.assertTrue(containsTestRootPath(inDir1Dir2, paths));

paths = fSys.listStatus(testDir, true);
Assert.assertEquals(5, paths.length);
Assert.assertTrue(containsTestRootPath(inFile1, paths));
Assert.assertTrue(containsTestRootPath(dir1File1, paths));
Assert.assertTrue(containsTestRootPath(dir1File2, paths));
Assert.assertTrue(containsTestRootPath(dir1Dir2File1, paths));
Assert.assertTrue(containsTestRootPath(dir1Dir2File2, paths));

paths = fSys.listStatus(inDir1Dir2Dir3, true);
Assert.assertEquals(0, paths.length);

}

@Test
public void testGlobStatusThrowsExceptionForUnreadableDir()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,14 @@ public FSDataOutputStream create(Path f, FsPermission permission,
public short getReplication(Path src);
public void processDeleteOnExit();
public FsStatus getStatus();
public FileStatus[] listStatus(Path f, PathFilter filter);
FileStatus[] listStatus(Path f, boolean recursive);
FileStatus[] listStatus(Path f, PathFilter filter);
public FileStatus[] listStatusBatch(Path f, byte[] token);
FileStatus[] listStatus(Path f, PathFilter filter, boolean recursive);
public FileStatus[] listStatus(Path[] files);
FileStatus[] listStatus(Path[] files, boolean recursive);
public FileStatus[] listStatus(Path[] files, PathFilter filter);
FileStatus[] listStatus(Path[] files, PathFilter filter, boolean recursive);
public FileStatus[] globStatus(Path pathPattern);
public FileStatus[] globStatus(Path pathPattern, PathFilter filter);
public Iterator<LocatedFileStatus> listFiles(Path path,
Expand Down Expand Up @@ -143,6 +147,10 @@ public Token<?>[] addDelegationTokens(String renewer, Credentials creds)
of the filter such as checksums.
*/
MultipartUploaderBuilder createMultipartUploader(Path basePath);

Iterator<LocatedFileStatus> listLocatedStatus(Path f, boolean recursive);
Iterator<LocatedFileStatus> listLocatedStatus(Path f,
PathFilter filter, boolean recursive);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,25 @@ public FSDataOutputStream create(Path f, FsPermission permission,
void setQuota(Path f, long namespaceQuota, long storagespaceQuota);
void setQuotaByStorageType(Path f, StorageType type, long quota);
public FsStatus getStatus();
public FileStatus[] listStatus(Path f, PathFilter filter);
FileStatus[] listStatus(Path f, boolean recursive);
FileStatus[] listStatus(Path f, PathFilter filter);
public FileStatus[] listStatusBatch(Path f, byte[] token);
FileStatus[] listStatus(Path f, PathFilter filter, boolean recursive);
public FileStatus[] listStatus(Path[] files);
FileStatus[] listStatus(Path[] files, boolean recursive);
public FileStatus[] listStatus(Path[] files, PathFilter filter);
FileStatus[] listStatus(Path[] files, PathFilter filter, boolean recursive);
public FileStatus[] globStatus(Path pathPattern);
public FileStatus[] globStatus(Path pathPattern, PathFilter filter);

public Iterator<LocatedFileStatus> listFiles(Path path,
boolean isRecursive);

public Iterator<LocatedFileStatus> listLocatedStatus(Path f);
public Iterator<LocatedFileStatus> listLocatedStatus(Path f,
PathFilter filter);
Iterator<LocatedFileStatus> listLocatedStatus(Path f, boolean recursive);
Iterator<LocatedFileStatus> listLocatedStatus(Path f, PathFilter filter);
Iterator<LocatedFileStatus> listLocatedStatus(Path f, PathFilter filter,
boolean recursive);
public Iterator<FileStatus> listStatusIterator(Path f);
public void copyFromLocalFile(Path src, Path dst);
public void moveFromLocalFile(Path[] srcs, Path dst);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,35 +171,6 @@ public static PathFilter getInputPathFilter(JobConf conf) {
ReflectionUtils.newInstance(filterClass, conf) : null;
}

/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't remove this as it breaks methods external classes may use

FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.shrinkStatus(stat));
}
}
}
}

/**
* List input directories.
* Subclasses may override to, e.g., select only files matching a regular
Expand Down Expand Up @@ -283,17 +254,11 @@ private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter =
fs.listLocatedStatus(globStat.getPath());
fs.listLocatedStatus(globStat.getPath(), recursive);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
} else {
result.add(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.shrinkStatus(stat));
}
result.add(stat);
}
}
} else {
Expand Down
Loading