Skip to content

HADOOP-16458 LocatedFileStatusFetcher.getFileStatuses failing intermittently with s3 #1160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2064,7 +2064,12 @@ public FileStatus[] listStatus(Path[] files, PathFilter filter)
* @throws IOException IO failure
*/
public FileStatus[] globStatus(Path pathPattern) throws IOException {
return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
return Globber.createGlobber(this)
.withPathPattern(pathPattern)
.withPathFiltern(DEFAULT_FILTER)
.withResolveSymlinks(true)
.build()
.glob();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,24 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.DurationInfo;

import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Implementation of {@link FileSystem#globStatus(Path, PathFilter)}.
* This has historically been package-private; it has been opened
* up for object stores within the {@code hadoop-*} codebase ONLY.
* It could be expanded for external store implementations in future.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class Globber {
public class Globber {
public static final Logger LOG =
LoggerFactory.getLogger(Globber.class.getName());

Expand All @@ -42,21 +51,62 @@ class Globber {
private final Path pathPattern;
private final PathFilter filter;
private final Tracer tracer;

public Globber(FileSystem fs, Path pathPattern, PathFilter filter) {
private final boolean resolveSymlinks;

Globber(FileSystem fs, Path pathPattern, PathFilter filter) {
this.fs = fs;
this.fc = null;
this.pathPattern = pathPattern;
this.filter = filter;
this.tracer = FsTracer.get(fs.getConf());
this.resolveSymlinks = true;
}

Globber(FileContext fc, Path pathPattern, PathFilter filter) {
this.fs = null;
this.fc = fc;
this.pathPattern = pathPattern;
this.filter = filter;
this.tracer = fc.getTracer();
this.resolveSymlinks = true;
}

/**
* Filesystem constructor for use by {@link GlobBuilder}.
* @param fs filesystem
* @param pathPattern path pattern
* @param filter optional filter
* @param resolveSymlinks should symlinks be resolved.
*/
private Globber(FileSystem fs, Path pathPattern, PathFilter filter,
boolean resolveSymlinks) {
this.fs = fs;
this.fc = null;
this.pathPattern = pathPattern;
this.filter = filter;
this.resolveSymlinks = resolveSymlinks;
this.tracer = FsTracer.get(fs.getConf());
LOG.debug("Created Globber for path={}, symlinks={}",
pathPattern, resolveSymlinks);
}

public Globber(FileContext fc, Path pathPattern, PathFilter filter) {
/**
* File Context constructor for use by {@link GlobBuilder}.
* @param fc file context
* @param pathPattern path pattern
* @param filter optional filter
* @param resolveSymlinks should symlinks be resolved.
*/
private Globber(FileContext fc, Path pathPattern, PathFilter filter,
boolean resolveSymlinks) {
this.fs = null;
this.fc = fc;
this.pathPattern = pathPattern;
this.filter = filter;
this.resolveSymlinks = resolveSymlinks;
this.tracer = fc.getTracer();
LOG.debug("Created Globber path={}, symlinks={}",
pathPattern, resolveSymlinks);
}

private FileStatus getFileStatus(Path path) throws IOException {
Expand All @@ -67,6 +117,7 @@ private FileStatus getFileStatus(Path path) throws IOException {
return fc.getFileStatus(path);
}
} catch (FileNotFoundException e) {
LOG.debug("getFileStatus({}) failed; returning null", path, e);
return null;
}
}
Expand All @@ -79,6 +130,7 @@ private FileStatus[] listStatus(Path path) throws IOException {
return fc.util().listStatus(path);
}
} catch (FileNotFoundException e) {
LOG.debug("listStatus({}) failed; returning empty array", path, e);
return new FileStatus[0];
}
}
Expand Down Expand Up @@ -107,7 +159,7 @@ private static String unescapePathComponent(String name) {
*/
private static List<String> getPathComponents(String path)
throws IOException {
ArrayList<String> ret = new ArrayList<String>();
ArrayList<String> ret = new ArrayList<>();
for (String component : path.split(Path.SEPARATOR)) {
if (!component.isEmpty()) {
ret.add(component);
Expand Down Expand Up @@ -145,7 +197,8 @@ private String authorityFromPath(Path path) throws IOException {
public FileStatus[] glob() throws IOException {
TraceScope scope = tracer.newScope("Globber#glob");
scope.addKVAnnotation("pattern", pathPattern.toUri().getPath());
try {
try (DurationInfo ignored = new DurationInfo(LOG, false,
"glob %s", pathPattern)) {
return doGlob();
} finally {
scope.close();
Expand All @@ -164,24 +217,26 @@ private FileStatus[] doGlob() throws IOException {
String pathPatternString = pathPattern.toUri().getPath();
List<String> flattenedPatterns = GlobExpander.expand(pathPatternString);

LOG.debug("Filesystem glob {}", pathPatternString);
// Now loop over all flattened patterns. In every case, we'll be trying to
// match them to entries in the filesystem.
ArrayList<FileStatus> results =
new ArrayList<FileStatus>(flattenedPatterns.size());
new ArrayList<>(flattenedPatterns.size());
boolean sawWildcard = false;
for (String flatPattern : flattenedPatterns) {
// Get the absolute path for this flattened pattern. We couldn't do
// this prior to flattening because of patterns like {/,a}, where which
// path you go down influences how the path must be made absolute.
Path absPattern = fixRelativePart(new Path(
flatPattern.isEmpty() ? Path.CUR_DIR : flatPattern));
LOG.debug("Pattern: {}", absPattern);
// Now we break the flattened, absolute pattern into path components.
// For example, /a/*/c would be broken into the list [a, *, c]
List<String> components =
getPathComponents(absPattern.toUri().getPath());
// Starting out at the root of the filesystem, we try to match
// filesystem entries against pattern components.
ArrayList<FileStatus> candidates = new ArrayList<FileStatus>(1);
ArrayList<FileStatus> candidates = new ArrayList<>(1);
// To get the "real" FileStatus of root, we'd have to do an expensive
// RPC to the NameNode. So we create a placeholder FileStatus which has
// the correct path, but defaults for the rest of the information.
Expand All @@ -206,12 +261,13 @@ private FileStatus[] doGlob() throws IOException {
for (int componentIdx = 0; componentIdx < components.size();
componentIdx++) {
ArrayList<FileStatus> newCandidates =
new ArrayList<FileStatus>(candidates.size());
new ArrayList<>(candidates.size());
GlobFilter globFilter = new GlobFilter(components.get(componentIdx));
String component = unescapePathComponent(components.get(componentIdx));
if (globFilter.hasPattern()) {
sawWildcard = true;
}
LOG.debug("Component {}, patterned={}", component, sawWildcard);
if (candidates.isEmpty() && sawWildcard) {
// Optimization: if there are no more candidates left, stop examining
// the path components. We can only do this if we've already seen
Expand Down Expand Up @@ -245,19 +301,31 @@ private FileStatus[] doGlob() throws IOException {
// incorrectly conclude that /a/b was a file and should not match
// /a/*/*. So we use getFileStatus of the path we just listed to
// disambiguate.
Path path = candidate.getPath();
FileStatus status = getFileStatus(path);
if (status == null) {
// null means the file was not found
LOG.warn("File/directory {} not found:"
+ " it may have been deleted."
+ " If this is an object store, this can be a sign of"
+ " eventual consistency problems.",
path);
continue;
}
if (!status.isDirectory()) {
continue;
if (resolveSymlinks) {
LOG.debug("listStatus found one entry; disambiguating {}",
children[0]);
Path path = candidate.getPath();
FileStatus status = getFileStatus(path);
if (status == null) {
// null means the file was not found
LOG.warn("File/directory {} not found:"
+ " it may have been deleted."
+ " If this is an object store, this can be a sign of"
+ " eventual consistency problems.",
path);
continue;
}
if (!status.isDirectory()) {
LOG.debug("Resolved entry is a file; skipping: {}", status);
continue;
}
} else {
// there's no symlinks in this store, so no need to issue
// another call, just see if the result is a directory or a file
if (children[0].getPath().equals(candidate.getPath())) {
// the listing status is of a file
continue;
}
}
}
for (FileStatus child : children) {
Expand Down Expand Up @@ -312,6 +380,8 @@ private FileStatus[] doGlob() throws IOException {
*/
if ((!sawWildcard) && results.isEmpty() &&
(flattenedPatterns.size() <= 1)) {
LOG.debug("No matches found and there was no wildcard in the path {}",
pathPattern);
return null;
}
/*
Expand All @@ -324,4 +394,98 @@ private FileStatus[] doGlob() throws IOException {
Arrays.sort(ret);
return ret;
}

/**
* Create a builder for a Globber, bonded to the specific filesystem.
* @param filesystem filesystem
* @return the builder to finish configuring.
*/
public static GlobBuilder createGlobber(FileSystem filesystem) {
return new GlobBuilder(filesystem);
}

/**
* Create a builder for a Globber, bonded to the specific file
* context.
* @param fileContext file context.
* @return the builder to finish configuring.
*/
public static GlobBuilder createGlobber(FileContext fileContext) {
return new GlobBuilder(fileContext);
}

/**
* Builder for Globber instances.
*/
@InterfaceAudience.Private
public static class GlobBuilder {

private final FileSystem fs;

private final FileContext fc;

private Path pathPattern;

private PathFilter filter;

private boolean resolveSymlinks = true;

/**
* Construct bonded to a file context.
* @param fc file context.
*/
public GlobBuilder(final FileContext fc) {
this.fs = null;
this.fc = checkNotNull(fc);
}

/**
* Construct bonded to a filesystem.
* @param fs file system.
*/
public GlobBuilder(final FileSystem fs) {
this.fs = checkNotNull(fs);
this.fc = null;
}

/**
* Set the path pattern.
* @param pattern pattern to use.
* @return the builder
*/
public GlobBuilder withPathPattern(Path pattern) {
pathPattern = pattern;
return this;
}

/**
* Set the path filter.
* @param pathFilter filter
* @return the builder
*/
public GlobBuilder withPathFiltern(PathFilter pathFilter) {
filter = pathFilter;
return this;
}

/**
* Set the symlink resolution policy.
* @param resolve resolution flag.
* @return the builder
*/
public GlobBuilder withResolveSymlinks(boolean resolve) {
resolveSymlinks = resolve;
return this;
}

/**
* Build the Globber.
* @return a new instance.
*/
public Globber build() {
return fs != null
? new Globber(fs, pathPattern, filter, resolveSymlinks)
: new Globber(fc, pathPattern, filter, resolveSymlinks);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ private static String robustToString(Object o) {
if (o == null) {
return NULL_RESULT;
} else {
if (o instanceof String) {
return '"' + (String)o + '"';
}
try {
return o.toString();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -250,7 +251,9 @@ protected FileStatus[] listStatus(JobConf job) throws IOException {
job, dirs, recursive, inputFilter, false);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
throw (IOException)
new InterruptedIOException("Interrupted while getting file statuses")
.initCause(e);
}
result = Iterables.toArray(locatedFiles, FileStatus.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ public class InvalidInputException extends IOException {

/**
* Create the exception with the given list.
* The first element of the list is used as the init cause value.
* @param probs the list of problems to report. this list is not copied.
*/
public InvalidInputException(List<IOException> probs) {
problems = probs;
if (!probs.isEmpty()) {
initCause(probs.get(0));
}
}

/**
Expand Down
Loading