Skip to content

Commit

Permalink
refactor gvfs get fileset context logics
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaojiebao committed Aug 5, 2024
1 parent 0f18c38 commit 2608f5a
Show file tree
Hide file tree
Showing 10 changed files with 846 additions and 528 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.apache.gravitino.file.FilesetCatalog;
import org.apache.gravitino.file.FilesetChange;
import org.apache.gravitino.file.FilesetContext;
import org.apache.gravitino.file.FilesetDataOperation;
import org.apache.gravitino.file.FilesetDataOperationCtx;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.FilesetEntity;
Expand All @@ -72,6 +74,7 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem

private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not exist";
private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does not exist";
private static final String SLASH = "/";

private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalogOperations.class);

Expand Down Expand Up @@ -360,23 +363,37 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public FilesetContext getFilesetContext(NameIdentifier ident, FilesetDataOperationCtx ctx)
throws NoSuchFilesetException {
// TODO we need move some check logics in the Hadoop / Python GVFS to here.
String subPath = ctx.subPath();
Preconditions.checkArgument(subPath != null, "subPath must not be null");
Preconditions.checkArgument(ctx.subPath() != null, "subPath must not be null");
// fill the sub path with a leading slash if it does not have one
String subPath;
if (!ctx.subPath().trim().startsWith(SLASH)) {
subPath = SLASH + ctx.subPath().trim();
} else {
subPath = ctx.subPath().trim();
}

Fileset fileset = loadFileset(ident);

String storageLocation = fileset.storageLocation();
boolean isMountFile = checkMountsSingleFile(fileset);
Preconditions.checkArgument(ctx.operation() != null, "operation must not be null.");
if (ctx.operation() == FilesetDataOperation.RENAME) {
Preconditions.checkArgument(
subPath.startsWith(SLASH) && subPath.length() > 1,
"subPath cannot be blank when need to rename a file or a directory.");
Preconditions.checkArgument(
!isMountFile,
String.format(
"Cannot rename the fileset: %s which only mounts to a single file.", ident));
}

String actualPath;
// subPath cannot be null, so we only need check if it is blank
if (StringUtils.isBlank(subPath)) {
actualPath = storageLocation;
// fill the sub path with a leading slash if it does not have one
if (subPath.startsWith(SLASH) && subPath.length() == 1) {
actualPath = fileset.storageLocation();
} else {
actualPath =
subPath.startsWith("/")
? String.format("%s%s", storageLocation, subPath)
: String.format("%s/%s", storageLocation, subPath);
actualPath = fileset.storageLocation() + subPath;
}

return HadoopFilesetContext.builder()
.withFileset(
EntityCombinedFileset.of(fileset)
Expand Down Expand Up @@ -677,4 +694,20 @@ static Path formalizePath(Path path, Configuration configuration) throws IOExcep
FileSystem defaultFs = FileSystem.get(configuration);
return path.makeQualified(defaultFs.getUri(), defaultFs.getWorkingDirectory());
}

private boolean checkMountsSingleFile(Fileset fileset) {
try {
Path locationPath = new Path(fileset.storageLocation());
return locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile();
} catch (FileNotFoundException e) {
// We should always return false here, same with the logic in `FileSystem.isFile(Path f)`.
return false;
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Cannot check whether the fileset: %s mounts a single file, exception: %s",
fileset.name(), e.getMessage()),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -774,46 +774,118 @@ public void testGetFilesetContext() throws IOException {
createSchema(schemaName, comment, null, schemaPath);

String catalogName = "c1";
String name = "fileset1024";
String storageLocation = TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + name;
Fileset fileset =
createFileset(name, schemaName, comment, Fileset.Type.MANAGED, null, storageLocation);
String filesetName1 = "test_get_fileset_context_1";
String filesetLocation1 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName1;
Fileset fileset1 =
createFileset(
filesetName1, schemaName, comment, Fileset.Type.MANAGED, null, filesetLocation1);

try (SecureHadoopCatalogOperations ops = new SecureHadoopCatalogOperations(store)) {
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName1);
// test sub path start with "/"
BaseFilesetDataOperationCtx dataOperationCtx1 =
BaseFilesetDataOperationCtx.builder()
.withSubPath("/test/test.parquet")
.withOperation(FilesetDataOperation.OPEN)
.withClientType(ClientType.HADOOP_GVFS)
.build();
FilesetContext context1 = ops.getFilesetContext(filesetIdent, dataOperationCtx1);
Assertions.assertEquals(name, context1.fileset().name());
Assertions.assertEquals(filesetName1, context1.fileset().name());
Assertions.assertEquals(Fileset.Type.MANAGED, context1.fileset().type());
Assertions.assertEquals("comment1024", context1.fileset().comment());
Assertions.assertEquals(fileset.storageLocation(), context1.fileset().storageLocation());
Assertions.assertEquals(fileset1.storageLocation(), context1.fileset().storageLocation());
Assertions.assertEquals(
String.format("%s%s", context1.fileset().storageLocation(), dataOperationCtx1.subPath()),
context1.actualPath());
Assertions.assertFalse(context1.fileset().properties().containsKey(StringIdentifier.ID_KEY));

// test sub path start without "/"
BaseFilesetDataOperationCtx dataOperationCtx2 =
BaseFilesetDataOperationCtx.builder()
.withSubPath("test/test.parquet")
.withOperation(FilesetDataOperation.OPEN)
.withClientType(ClientType.HADOOP_GVFS)
.build();
FilesetContext context2 = ops.getFilesetContext(filesetIdent, dataOperationCtx2);
Assertions.assertEquals(name, context2.fileset().name());
Assertions.assertEquals(filesetName1, context2.fileset().name());
Assertions.assertEquals(Fileset.Type.MANAGED, context2.fileset().type());
Assertions.assertEquals("comment1024", context2.fileset().comment());
Assertions.assertEquals(fileset.storageLocation(), context2.fileset().storageLocation());
Assertions.assertEquals(fileset1.storageLocation(), context2.fileset().storageLocation());
Assertions.assertFalse(context2.fileset().properties().containsKey(StringIdentifier.ID_KEY));
Assertions.assertEquals(
String.format("%s/%s", context2.fileset().storageLocation(), dataOperationCtx2.subPath()),
context2.actualPath());
}

// test sub path is null
Assertions.assertThrows(
IllegalArgumentException.class,
() ->
BaseFilesetDataOperationCtx.builder()
.withSubPath(null)
.withOperation(FilesetDataOperation.RENAME)
.withClientType(ClientType.HADOOP_GVFS)
.build());

// test mount a single file
String filesetName2 = "test_get_fileset_context_2";
String filesetLocation2 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName2;
Path filesetLocationPath2 = new Path(filesetLocation2);
createFileset(filesetName2, schemaName, comment, Fileset.Type.MANAGED, null, filesetLocation2);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
FileSystem localFileSystem = filesetLocationPath2.getFileSystem(new Configuration())) {
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName2);
// replace fileset location to a single file
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath2));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath2).isDirectory());
localFileSystem.delete(filesetLocationPath2, true);
localFileSystem.create(filesetLocationPath2);
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath2));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath2).isFile());

BaseFilesetDataOperationCtx dataOperationCtx1 =
BaseFilesetDataOperationCtx.builder()
.withSubPath("/year=2024/month=07/day=22/test.parquet")
.withOperation(FilesetDataOperation.RENAME)
.withClientType(ClientType.HADOOP_GVFS)
.build();
Assertions.assertThrows(
IllegalArgumentException.class,
() -> ops.getFilesetContext(filesetIdent, dataOperationCtx1));
}

String filesetName3 = "test_get_fileset_context_3";
String filesetLocation3 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName3;
Path filesetLocationPath3 = new Path(filesetLocation3);
createFileset(filesetName3, schemaName, comment, Fileset.Type.MANAGED, null, filesetLocation3);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
FileSystem localFileSystem = filesetLocationPath3.getFileSystem(new Configuration())) {
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName3);
// replace fileset location to a single file
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath3));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath3).isDirectory());
localFileSystem.delete(filesetLocationPath3, true);
localFileSystem.create(filesetLocationPath3);
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath3));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath3).isFile());

// test rename with an empty subPath
BaseFilesetDataOperationCtx dataOperationCtx1 =
BaseFilesetDataOperationCtx.builder()
.withSubPath("")
.withOperation(FilesetDataOperation.RENAME)
.withClientType(ClientType.HADOOP_GVFS)
.build();
Assertions.assertThrows(
IllegalArgumentException.class,
() -> ops.getFilesetContext(filesetIdent, dataOperationCtx1));
}
}

private static Stream<Arguments> locationArguments() {
Expand Down

This file was deleted.

Loading

0 comments on commit 2608f5a

Please sign in to comment.