Skip to content

Commit

Permalink
[#4278] feat(filesystem): Refactor the getFileLocation logics in ha…
Browse files Browse the repository at this point in the history
…doop GVFS (#4320)

### What changes were proposed in this pull request?

Refactor the `getFileLocation` logic in Hadoop GVFS so that when sending
a request to the server to obtain the file location, the current data
operation and operation path information are reported.

### Why are the changes needed?

Fix: #4278 

### How was this patch tested?

Add and refactor some UTs, and existing ITs maintain normally.
  • Loading branch information
xloya authored Sep 23, 2024
1 parent a258a46 commit be7a5e6
Show file tree
Hide file tree
Showing 16 changed files with 838 additions and 593 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -40,12 +41,16 @@
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.exceptions.AlreadyExistsException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
Expand Down Expand Up @@ -358,7 +363,6 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
// TODO we need move some check logics in the Hadoop / Python GVFS to here.
Preconditions.checkArgument(subPath != null, "subPath must not be null");
String processedSubPath;
if (!subPath.trim().isEmpty() && !subPath.trim().startsWith(SLASH)) {
Expand All @@ -369,11 +373,56 @@ public String getFileLocation(NameIdentifier ident, String subPath)

Fileset fileset = loadFileset(ident);

boolean isSingleFile = checkSingleFile(fileset);
// if the storage location is a single file, it cannot have sub path to access.
if (isSingleFile && StringUtils.isBlank(processedSubPath)) {
throw new GravitinoRuntimeException(
"Sub path should always be blank, because the fileset only mounts a single file.");
}

// do checks for some data operations.
if (hasCallerContext()) {
Map<String, String> contextMap = CallerContext.CallerContextHolder.get().context();
String operation =
contextMap.getOrDefault(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.UNKNOWN.name());
if (!FilesetDataOperation.checkValid(operation)) {
LOG.warn(
"The data operation: {} is not valid, we cannot do some checks for this operation.",
operation);
} else {
FilesetDataOperation dataOperation = FilesetDataOperation.valueOf(operation);
switch (dataOperation) {
case RENAME:
// Fileset only mounts a single file, the storage location of the fileset cannot be
// renamed; Otherwise the metadata in the Gravitino server may be inconsistent.
if (isSingleFile) {
throw new GravitinoRuntimeException(
"Cannot rename the fileset: %s which only mounts to a single file.", ident);
}
// if the sub path is blank, it cannot be renamed,
// otherwise the metadata in the Gravitino server may be inconsistent.
if (StringUtils.isBlank(processedSubPath)
|| (processedSubPath.startsWith(SLASH) && processedSubPath.length() == 1)) {
throw new GravitinoRuntimeException(
"subPath cannot be blank when need to rename a file or a directory.");
}
break;
default:
break;
}
}
}

String fileLocation;
// subPath cannot be null, so we only need check if it is blank
if (StringUtils.isBlank(processedSubPath)) {
// 1. if the storage location is a single file, we pass the storage location directly
// 2. if the processed sub path is blank, we pass the storage location directly
if (isSingleFile || StringUtils.isBlank(processedSubPath)) {
fileLocation = fileset.storageLocation();
} else {
// the processed sub path always starts with "/" if it is not blank,
// so we can safely remove the tailing slash if storage location ends with "/".
String storageLocation =
fileset.storageLocation().endsWith(SLASH)
? fileset.storageLocation().substring(0, fileset.storageLocation().length() - 1)
Expand Down Expand Up @@ -672,4 +721,25 @@ static Path formalizePath(Path path, Configuration configuration) throws IOExcep
FileSystem defaultFs = FileSystem.get(configuration);
return path.makeQualified(defaultFs.getUri(), defaultFs.getWorkingDirectory());
}

private boolean hasCallerContext() {
return CallerContext.CallerContextHolder.get() != null
&& CallerContext.CallerContextHolder.get().context() != null
&& !CallerContext.CallerContextHolder.get().context().isEmpty();
}

private boolean checkSingleFile(Fileset fileset) {
try {
Path locationPath = new Path(fileset.storageLocation());
return locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile();
} catch (FileNotFoundException e) {
// We should always return false here, same with the logic in `FileSystem.isFile(Path f)`.
return false;
} catch (IOException e) {
throw new GravitinoRuntimeException(
e,
"Exception occurs when checking whether fileset: %s mounts a single file",
fileset.name());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.PropertiesMetadata;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
Expand Down Expand Up @@ -800,6 +804,85 @@ public void testGetFileLocation() throws IOException {
String fileLocation3 = ops.getFileLocation(filesetIdent, subPath4);
Assertions.assertEquals(fileset.storageLocation(), fileLocation3);
}

// test mount a single file
String filesetName2 = "test_get_file_location_2";
String filesetLocation2 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName2;
Path filesetLocationPath2 = new Path(filesetLocation2);
createFileset(filesetName2, schemaName, comment, Fileset.Type.MANAGED, null, filesetLocation2);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
FileSystem localFileSystem = filesetLocationPath2.getFileSystem(new Configuration())) {
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName2);
// replace fileset location to a single file
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath2));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath2).isDirectory());
localFileSystem.delete(filesetLocationPath2, true);
localFileSystem.create(filesetLocationPath2);
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath2));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath2).isFile());

String subPath = "/year=2024/month=07/day=22/test.parquet";
Map<String, String> contextMap = Maps.newHashMap();
contextMap.put(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.RENAME.name());
CallerContext callerContext = CallerContext.builder().withContext(contextMap).build();
CallerContext.CallerContextHolder.set(callerContext);

Assertions.assertThrows(
GravitinoRuntimeException.class, () -> ops.getFileLocation(filesetIdent, subPath));
} finally {
CallerContext.CallerContextHolder.remove();
}

// test rename with an empty subPath
String filesetName3 = "test_get_file_location_3";
String filesetLocation3 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName3;
Path filesetLocationPath3 = new Path(filesetLocation3);
createFileset(filesetName3, schemaName, comment, Fileset.Type.MANAGED, null, filesetLocation3);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
FileSystem localFileSystem = filesetLocationPath3.getFileSystem(new Configuration())) {
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName3);
// replace fileset location to a single file
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath3));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath3).isDirectory());
localFileSystem.delete(filesetLocationPath3, true);
localFileSystem.create(filesetLocationPath3);
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath3));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath3).isFile());

Map<String, String> contextMap = Maps.newHashMap();
contextMap.put(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.RENAME.name());
CallerContext callerContext = CallerContext.builder().withContext(contextMap).build();
CallerContext.CallerContextHolder.set(callerContext);
Assertions.assertThrows(
GravitinoRuntimeException.class, () -> ops.getFileLocation(filesetIdent, ""));
}

// test storage location end with "/"
String filesetName4 = "test_get_file_location_4";
String filesetLocation4 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName4 + "/";
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name);
Fileset mockFileset = Mockito.mock(Fileset.class);
when(mockFileset.name()).thenReturn(filesetName4);
when(mockFileset.storageLocation()).thenReturn(filesetLocation4);

try (HadoopCatalogOperations mockOps = Mockito.mock(HadoopCatalogOperations.class)) {
mockOps.hadoopConf = new Configuration();
when(mockOps.loadFileset(filesetIdent)).thenReturn(mockFileset);
String subPath = "/test/test.parquet";
when(mockOps.getFileLocation(filesetIdent, subPath)).thenCallRealMethod();
String fileLocation = mockOps.getFileLocation(filesetIdent, subPath);
Assertions.assertEquals(
String.format("%s%s", mockFileset.storageLocation(), subPath.substring(1)), fileLocation);
}
}

private static Stream<Arguments> locationArguments() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,16 +664,25 @@ public void testGetFileLocationWithInvalidAuditHeaders() {
try {
String filesetName = GravitinoITUtils.genRandomName("fileset");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Assertions.assertFalse(catalog.asFilesetCatalog().filesetExists(filesetIdent));
Fileset expectedFileset =
catalog
.asFilesetCatalog()
.createFileset(
filesetIdent,
"fileset comment",
Fileset.Type.MANAGED,
generateLocation(filesetName),
Maps.newHashMap());

Map<String, String> context = new HashMap<>();
// this is an invalid internal client type.
// this is an invalid internal client type, but the server will return normally
context.put(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE, "test");
CallerContext callerContext = CallerContext.builder().withContext(context).build();
CallerContext.CallerContextHolder.set(callerContext);

Assertions.assertThrows(
IllegalArgumentException.class,
() -> catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test.par"));
String fileLocation = catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test.par");
Assertions.assertEquals(expectedFileset.storageLocation() + "/test.par", fileLocation);
} finally {
CallerContext.CallerContextHolder.remove();
}
Expand Down

This file was deleted.

Loading

0 comments on commit be7a5e6

Please sign in to comment.