Skip to content

Commit

Permalink
[#4890] improvement(IT): Add the Integration Tests for `getFileLocati…
Browse files Browse the repository at this point in the history
…on` interface (#4920)

### What changes were proposed in this pull request?

Add the integration Tests for `getFileLocation` interface and fix some
issues.

### Why are the changes needed?

Fix: #4890 

### How was this patch tested?

Add some ITs.
  • Loading branch information
xloya authored Sep 13, 2024
1 parent ff84056 commit 9f416c7
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.CatalogChange;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.audit.InternalClientType;
import org.apache.gravitino.client.GravitinoMetalake;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
Expand Down Expand Up @@ -71,7 +76,6 @@ public class HadoopCatalogIT extends AbstractIT {
@BeforeAll
public static void setup() throws IOException {
containerSuite.startHiveContainer();

Configuration conf = new Configuration();
conf.set("fs.defaultFS", defaultBaseLocation());
hdfs = FileSystem.get(conf);
Expand Down Expand Up @@ -609,6 +613,82 @@ public void testDropCatalogWithEmptySchema() {
Assertions.assertFalse(metalake.catalogExists(catalogName), "catalog should not be exists");
}

@Test
public void testGetFileLocation() {
String filesetName = GravitinoITUtils.genRandomName("fileset");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Assertions.assertFalse(catalog.asFilesetCatalog().filesetExists(filesetIdent));
Fileset expectedFileset =
catalog
.asFilesetCatalog()
.createFileset(
filesetIdent,
"fileset comment",
Fileset.Type.MANAGED,
generateLocation(filesetName),
Maps.newHashMap());
Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(filesetIdent));
// test without caller context
try {
String actualFileLocation =
catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test1.par");

Assertions.assertEquals(expectedFileset.storageLocation() + "/test1.par", actualFileLocation);
} finally {
CallerContext.CallerContextHolder.remove();
}

// test with caller context
try {
Map<String, String> context = new HashMap<>();
context.put(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
InternalClientType.HADOOP_GVFS.name());
context.put(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.CREATE.name());
CallerContext callerContext = CallerContext.builder().withContext(context).build();
CallerContext.CallerContextHolder.set(callerContext);

String actualFileLocation =
catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test2.par");

Assertions.assertEquals(expectedFileset.storageLocation() + "/test2.par", actualFileLocation);
} finally {
CallerContext.CallerContextHolder.remove();
}
}

@Test
public void testGetFileLocationWithInvalidAuditHeaders() {
try {
String filesetName = GravitinoITUtils.genRandomName("fileset");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);

Map<String, String> context = new HashMap<>();
// this is an invalid internal client type.
context.put(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE, "test");
CallerContext callerContext = CallerContext.builder().withContext(context).build();
CallerContext.CallerContextHolder.set(callerContext);

Assertions.assertThrows(
IllegalArgumentException.class,
() -> catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test.par"));
} finally {
CallerContext.CallerContextHolder.remove();
}
}

private static String generateLocation(String filesetName) {
return String.format(
"hdfs://%s:%d/user/hadoop/%s/%s/%s",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT,
catalogName,
schemaName,
filesetName);
}

private Fileset createFileset(
String filesetName,
String comment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,11 @@ public String getFileLocation(NameIdentifier ident, String subPath)
try {
String actualFileLocation = dispatcher.getFileLocation(ident, subPath);
// get the audit info from the thread local context
CallerContext callerContext = CallerContext.CallerContextHolder.get();
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.putAll(callerContext.context());
CallerContext callerContext = CallerContext.CallerContextHolder.get();
if (callerContext != null && callerContext.context() != null) {
builder.putAll(callerContext.context());
}
eventBus.dispatchEvent(
new GetFileLocationEvent(
PrincipalUtils.getCurrentUserName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.gravitino.lock.LockType;
import org.apache.gravitino.lock.TreeLockUtils;
import org.apache.gravitino.metrics.MetricNames;
import org.apache.gravitino.rest.RESTUtils;
import org.apache.gravitino.server.web.Utils;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
Expand Down Expand Up @@ -268,7 +269,7 @@ public Response getFileLocation(
catalog,
schema,
fileset,
subPath);
RESTUtils.decodeString(subPath));
try {
return Utils.doAs(
httpRequest,
Expand All @@ -283,7 +284,9 @@ public Response getFileLocation(
}
String actualFileLocation =
TreeLockUtils.doWithTreeLock(
ident, LockType.READ, () -> dispatcher.getFileLocation(ident, subPath));
ident,
LockType.READ,
() -> dispatcher.getFileLocation(ident, RESTUtils.decodeString(subPath)));
return Utils.ok(new FileLocationResponse(actualFileLocation));
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
Expand All @@ -41,6 +42,10 @@
import org.apache.gravitino.Config;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.audit.InternalClientType;
import org.apache.gravitino.catalog.FilesetDispatcher;
import org.apache.gravitino.catalog.FilesetOperationDispatcher;
import org.apache.gravitino.dto.file.FilesetDTO;
Expand Down Expand Up @@ -68,6 +73,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

public class TestFilesetOperations extends JerseyTest {
private static class MockServletRequestFactory extends ServletRequestFactoryBase {
Expand Down Expand Up @@ -435,12 +441,13 @@ public void testDropFileset() {

@Test
public void testGetFileLocation() {
String actualPath = "mock location/path1";

when(dispatcher.getFileLocation(any(), any())).thenReturn(actualPath);
// Test encoded subPath
NameIdentifier fullIdentifier = NameIdentifier.of(metalake, catalog, schema, "fileset1");
String subPath = "/test/1";
when(dispatcher.getFileLocation(fullIdentifier, subPath)).thenReturn(subPath);
Response resp =
target(filesetPath(metalake, catalog, schema) + "fileset1/location")
.queryParam("sub_path", RESTUtils.encodeString("test/1"))
.queryParam("sub_path", RESTUtils.encodeString(subPath))
.request(MediaType.APPLICATION_JSON_TYPE)
.accept("application/vnd.gravitino.v1+json")
.get();
Expand All @@ -449,13 +456,15 @@ public void testGetFileLocation() {
FileLocationResponse contextResponse = resp.readEntity(FileLocationResponse.class);
Assertions.assertEquals(0, contextResponse.getCode());

Assertions.assertEquals(actualPath, contextResponse.getFileLocation());
Assertions.assertEquals(subPath, contextResponse.getFileLocation());

// Test throw NoSuchFilesetException
doThrow(new NoSuchFilesetException("no found")).when(dispatcher).getFileLocation(any(), any());
doThrow(new NoSuchFilesetException("no found"))
.when(dispatcher)
.getFileLocation(fullIdentifier, subPath);
Response resp1 =
target(filesetPath(metalake, catalog, schema) + "fileset1/location")
.queryParam("sub_path", RESTUtils.encodeString("test/1"))
.queryParam("sub_path", RESTUtils.encodeString(subPath))
.request(MediaType.APPLICATION_JSON_TYPE)
.accept("application/vnd.gravitino.v1+json")
.get();
Expand All @@ -466,10 +475,12 @@ public void testGetFileLocation() {
Assertions.assertEquals(NoSuchFilesetException.class.getSimpleName(), errorResp.getType());

// Test throw RuntimeException
doThrow(new RuntimeException("internal error")).when(dispatcher).getFileLocation(any(), any());
doThrow(new RuntimeException("internal error"))
.when(dispatcher)
.getFileLocation(fullIdentifier, subPath);
Response resp2 =
target(filesetPath(metalake, catalog, schema) + "fileset1/location")
.queryParam("sub_path", RESTUtils.encodeString("test/1"))
.queryParam("sub_path", RESTUtils.encodeString(subPath))
.request(MediaType.APPLICATION_JSON_TYPE)
.accept("application/vnd.gravitino.v1+json")
.get();
Expand All @@ -479,6 +490,70 @@ public void testGetFileLocation() {
ErrorResponse errorResp2 = resp2.readEntity(ErrorResponse.class);
Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, errorResp2.getCode());
Assertions.assertEquals(RuntimeException.class.getSimpleName(), errorResp2.getType());

// Test not encoded subPath
NameIdentifier fullIdentifier1 = NameIdentifier.of(metalake, catalog, schema, "fileset2");
String subPath1 = "/test/2";
when(dispatcher.getFileLocation(fullIdentifier1, subPath1)).thenReturn(subPath1);
Response resp3 =
target(filesetPath(metalake, catalog, schema) + "fileset2/location")
.queryParam("sub_path", subPath1)
.request(MediaType.APPLICATION_JSON_TYPE)
.accept("application/vnd.gravitino.v1+json")
.get();
Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp3.getStatus());

FileLocationResponse contextResponse1 = resp3.readEntity(FileLocationResponse.class);
Assertions.assertEquals(0, contextResponse1.getCode());

Assertions.assertEquals(subPath1, contextResponse1.getFileLocation());

// Test header to caller context
try {
Map<String, String> callerContextMap = Maps.newHashMap();
NameIdentifier fullIdentifier2 = NameIdentifier.of(metalake, catalog, schema, "fileset3");
String subPath2 = "/test/3";
when(dispatcher.getFileLocation(fullIdentifier2, subPath2))
.thenAnswer(
(Answer<String>)
invocation -> {
try {
CallerContext context = CallerContext.CallerContextHolder.get();
callerContextMap.putAll(context.context());
return subPath2;
} finally {
CallerContext.CallerContextHolder.remove();
}
});
Response resp4 =
target(filesetPath(metalake, catalog, schema) + "fileset3/location")
.queryParam("sub_path", RESTUtils.encodeString(subPath2))
.request(MediaType.APPLICATION_JSON_TYPE)
.header(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
InternalClientType.HADOOP_GVFS.name())
.header(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.CREATE.name())
.accept("application/vnd.gravitino.v1+json")
.get();
Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp3.getStatus());

FileLocationResponse contextResponse2 = resp4.readEntity(FileLocationResponse.class);
Assertions.assertEquals(0, contextResponse2.getCode());

Assertions.assertEquals(subPath2, contextResponse2.getFileLocation());

Assertions.assertFalse(callerContextMap.isEmpty());
Assertions.assertEquals(
InternalClientType.HADOOP_GVFS.name(),
callerContextMap.get(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE));
Assertions.assertEquals(
FilesetDataOperation.CREATE.name(),
callerContextMap.get(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION));
} finally {
CallerContext.CallerContextHolder.remove();
}
}

private void assertUpdateFileset(FilesetUpdatesRequest req, Fileset updatedFileset) {
Expand Down

0 comments on commit 9f416c7

Please sign in to comment.