Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve(filesystem-hadoop): support path without scheme for gvfs api #2779

Merged
merged 6 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
Expand All @@ -51,6 +52,13 @@ public class GravitinoVirtualFileSystem extends FileSystem {
private Cache<NameIdentifier, Pair<Fileset, FileSystem>> filesetCache;
private ScheduledThreadPoolExecutor scheduler;

// The pattern is used to match gvfs path. The scheme prefix (gvfs://) is optional.
Copy link
Contributor

@jerryshao jerryshao Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheme prefix should be "gvfs://fileset", not "gvfs://", right? If so, can you please clarify it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao You are right. I have fixed it. Please take a look. Thanks.

// The following path can be match:
// gvfs://fileset/fileset_catalog/fileset_schema/fileset1/file.txt
// /fileset_catalog/fileset_schema/fileset1/sub_dir/
private static final Pattern IDENTIFIER_PATTERN =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a description for this pattern which string will be matched?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Pattern.compile("^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?:[/[^/]+]*)$");

@Override
public void initialize(URI name, Configuration configuration) throws IOException {
if (!name.toString().startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)) {
Expand Down Expand Up @@ -214,39 +222,33 @@ private void checkAuthConfig(String authType, String configKey, String configVal
authType);
}

private String concatVirtualPrefix(NameIdentifier identifier) {
return GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX
+ identifier.namespace().level(1)
+ "/"
+ identifier.namespace().level(2)
+ "/"
+ identifier.name();
private String getVirtualLocation(NameIdentifier identifier, boolean withScheme) {
return String.format(
"%s/%s/%s/%s",
withScheme ? GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX : "",
identifier.namespace().level(1),
identifier.namespace().level(2),
identifier.name());
}

private Path getActualPathByIdentifier(
NameIdentifier identifier, Pair<Fileset, FileSystem> filesetPair, Path path) {
String virtualPath = path.toString();
if (!virtualPath.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX)) {
throw new InvalidPathException(
String.format(
"Path %s doesn't start with the scheme \"%s\".",
virtualPath, GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX));
}
boolean withScheme =
virtualPath.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX);
String virtualLocation = getVirtualLocation(identifier, withScheme);
String storageLocation = filesetPair.getLeft().storageLocation();
try {
if (checkMountsSingleFile(filesetPair)) {
String virtualPrefix = concatVirtualPrefix(identifier);
Preconditions.checkArgument(
virtualPath.equals(virtualPrefix),
virtualPath.equals(virtualLocation),
"Path: %s should be same with the virtual prefix: %s, because the fileset only mounts a single file.",
virtualPath,
virtualPrefix);
virtualLocation);

return new Path(filesetPair.getLeft().storageLocation());
return new Path(storageLocation);
} else {
return new Path(
virtualPath.replaceFirst(
concatVirtualPrefix(identifier),
new Path(filesetPair.getLeft().storageLocation()).toString()));
return new Path(virtualPath.replaceFirst(virtualLocation, storageLocation));
}
} catch (Exception e) {
throw new RuntimeException(
Expand Down Expand Up @@ -275,37 +277,32 @@ private boolean checkMountsSingleFile(Pair<Fileset, FileSystem> filesetPair) {
private FileStatus convertFileStatusPathPrefix(
FileStatus fileStatus, String actualPrefix, String virtualPrefix) {
String filePath = fileStatus.getPath().toString();
if (!filePath.startsWith(actualPrefix)) {
throw new InvalidPathException(
String.format("Path %s doesn't start with prefix \"%s\".", filePath, actualPrefix));
}
Preconditions.checkArgument(
filePath.startsWith(actualPrefix),
"Path %s doesn't start with prefix \"%s\".",
filePath,
actualPrefix);
Path path = new Path(filePath.replaceFirst(actualPrefix, virtualPrefix));
fileStatus.setPath(path);

return fileStatus;
}

private NameIdentifier extractIdentifier(URI virtualUri) {
@VisibleForTesting
NameIdentifier extractIdentifier(URI virtualUri) {
String virtualPath = virtualUri.toString();
Preconditions.checkArgument(
virtualUri
.toString()
.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX),
"Path %s doesn't start with scheme prefix \"%s\".",
virtualUri,
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX);

if (StringUtils.isBlank(virtualUri.toString())) {
throw new InvalidPathException("Uri which need be extracted cannot be null or empty.");
}
StringUtils.isNotBlank(virtualPath),
"Uri which need be extracted cannot be null or empty.");

// remove first '/' symbol with empty string
String[] reservedDirs =
Arrays.stream(virtualUri.getPath().replaceFirst("/", "").split("/")).toArray(String[]::new);
Matcher matcher = IDENTIFIER_PATTERN.matcher(virtualPath);
Preconditions.checkArgument(
reservedDirs.length >= 3, "URI %s doesn't contains valid identifier", virtualUri);
matcher.matches() && matcher.groupCount() == 3,
"URI %s doesn't contains valid identifier",
virtualPath);

return NameIdentifier.ofFileset(
metalakeName, reservedDirs[0], reservedDirs[1], reservedDirs[2]);
metalakeName, matcher.group(1), matcher.group(2), matcher.group(3));
}

private FilesetContext getFilesetContext(Path virtualPath) {
Expand Down Expand Up @@ -449,7 +446,7 @@ public FileStatus getFileStatus(Path path) throws IOException {
return convertFileStatusPathPrefix(
fileStatus,
context.getFileset().storageLocation(),
concatVirtualPrefix(context.getIdentifier()));
getVirtualLocation(context.getIdentifier(), true));
}

@Override
Expand All @@ -462,7 +459,7 @@ public FileStatus[] listStatus(Path path) throws IOException {
convertFileStatusPathPrefix(
fileStatus,
new Path(context.getFileset().storageLocation()).toString(),
concatVirtualPrefix(context.getIdentifier())))
getVirtualLocation(context.getIdentifier(), true)))
.toArray(FileStatus[]::new);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

/** Configuration class for Gravitino Virtual File System. */
class GravitinoVirtualFileSystemConfiguration {
public static final String GVFS_FILESET_PREFIX = "gvfs://fileset/";
public static final String GVFS_FILESET_PREFIX = "gvfs://fileset";
public static final String GVFS_SCHEME = "gvfs";

/** The configuration key for the Gravitino server URI. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

public class FileSystemTestUtils {
private static final String LOCAL_FS_PREFIX =
"file:/tmp/gravitino_test_fs_" + UUID.randomUUID().toString().replace("-", "") + "/";
"file:/tmp/gravitino_test_fs_" + UUID.randomUUID().toString().replace("-", "");

private static final int BUFFER_SIZE = 3;
private static final short REPLICATION = 1;
Expand All @@ -30,23 +30,24 @@ public static String localRootPrefix() {
return LOCAL_FS_PREFIX;
}

public static Path createFilesetPath(String filesetCatalog, String schema, String fileset) {
return new Path(
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX
+ "/"
+ filesetCatalog
+ "/"
+ schema
+ "/"
+ fileset);
public static Path createFilesetPath(
String filesetCatalog, String schema, String fileset, boolean withScheme) {
String filesetPath =
String.format(
"%s/%s/%s/%s",
withScheme ? GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX : "",
filesetCatalog,
schema,
fileset);
return new Path(filesetPath);
}

public static Path createLocalRootDir(String filesetCatalog) {
return new Path(LOCAL_FS_PREFIX + filesetCatalog);
return new Path(String.format("%s/%s", LOCAL_FS_PREFIX, filesetCatalog));
}

public static Path createLocalDirPrefix(String filesetCatalog, String schema, String fileset) {
return new Path(LOCAL_FS_PREFIX + filesetCatalog + "/" + schema + "/" + fileset);
return new Path(String.format("%s/%s/%s/%s", LOCAL_FS_PREFIX, filesetCatalog, schema, fileset));
}

public static void create(Path path, FileSystem fileSystem) throws IOException {
Expand Down
Loading
Loading